You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/09 06:40:16 UTC

[flink] 02/02: [FLINK-13587][table-planner-blink] Fix some operator names are not set in blink planner

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4517949700b0401923eaa0bd6ec6704f05b7bdf3
Author: Jark Wu <im...@gmail.com>
AuthorDate: Mon Aug 5 22:39:18 2019 +0800

    [FLINK-13587][table-planner-blink] Fix some operator names are not set in blink planner
    
    This closes #9363
---
 ...iterImpl.java => RelDescriptionWriterImpl.java} |  23 ++--
 .../planner/codegen/CorrelateCodeGenerator.scala   |   7 +-
 .../table/planner/delegation/BatchPlanner.scala    |   2 +-
 .../table/planner/delegation/StreamPlanner.scala   |   2 +-
 .../table/planner/plan/nodes/FlinkRelNode.scala    |  27 ++---
 .../table/planner/plan/nodes/calcite/Expand.scala  |  22 ++--
 .../planner/plan/nodes/common/CommonCalc.scala     |   7 +-
 .../plan/nodes/common/CommonLookupJoin.scala       |   6 +-
 .../plan/nodes/common/CommonPhysicalJoin.scala     |   7 +-
 .../plan/nodes/physical/batch/BatchExecCalc.scala  |   3 +-
 .../nodes/physical/batch/BatchExecCorrelate.scala  |   4 +-
 .../nodes/physical/batch/BatchExecExpand.scala     |   3 +-
 .../batch/BatchExecGroupAggregateBase.scala        |  14 +--
 .../physical/batch/BatchExecHashAggregate.scala    |   5 -
 .../batch/BatchExecHashAggregateBase.scala         |   4 +-
 .../nodes/physical/batch/BatchExecHashJoin.scala   |  12 +-
 .../batch/BatchExecHashWindowAggregate.scala       |   4 -
 .../batch/BatchExecHashWindowAggregateBase.scala   |   4 +-
 .../plan/nodes/physical/batch/BatchExecLimit.scala |   7 +-
 .../batch/BatchExecLocalHashAggregate.scala        |   2 -
 .../batch/BatchExecLocalHashWindowAggregate.scala  |   4 -
 .../batch/BatchExecLocalSortAggregate.scala        |   3 -
 .../batch/BatchExecLocalSortWindowAggregate.scala  |   2 -
 .../physical/batch/BatchExecNestedLoopJoin.scala   |  13 +-
 .../physical/batch/BatchExecOverAggregate.scala    |   6 +-
 .../plan/nodes/physical/batch/BatchExecRank.scala  |  10 +-
 .../plan/nodes/physical/batch/BatchExecSort.scala  |   2 +-
 .../physical/batch/BatchExecSortAggregate.scala    |   6 -
 .../batch/BatchExecSortAggregateBase.scala         |   4 +-
 .../nodes/physical/batch/BatchExecSortLimit.scala  |   9 +-
 .../physical/batch/BatchExecSortMergeJoin.scala    |  10 +-
 .../batch/BatchExecSortWindowAggregate.scala       |   5 -
 .../batch/BatchExecSortWindowAggregateBase.scala   |   4 +-
 .../nodes/physical/batch/BatchExecValues.scala     |   1 +
 .../nodes/physical/stream/StreamExecCalc.scala     |   2 +-
 .../physical/stream/StreamExecCorrelate.scala      |   1 +
 .../physical/stream/StreamExecDeduplicate.scala    |  13 +-
 .../nodes/physical/stream/StreamExecExpand.scala   |   3 +-
 .../stream/StreamExecGlobalGroupAggregate.scala    |   2 +-
 .../physical/stream/StreamExecGroupAggregate.scala |   2 +-
 .../stream/StreamExecGroupWindowAggregate.scala    |  10 +-
 .../StreamExecIncrementalGroupAggregate.scala      |   2 +-
 .../nodes/physical/stream/StreamExecJoin.scala     |   9 +-
 .../nodes/physical/stream/StreamExecLimit.scala    |   2 +-
 .../stream/StreamExecLocalGroupAggregate.scala     |   2 +-
 .../nodes/physical/stream/StreamExecMatch.scala    |   2 +-
 .../physical/stream/StreamExecOverAggregate.scala  |   2 +-
 .../nodes/physical/stream/StreamExecRank.scala     |  16 +--
 .../nodes/physical/stream/StreamExecSort.scala     |   2 +-
 .../physical/stream/StreamExecSortLimit.scala      |   9 +-
 .../physical/stream/StreamExecTemporalJoin.scala   |   8 +-
 .../physical/stream/StreamExecTemporalSort.scala   |   4 +-
 .../nodes/physical/stream/StreamExecValues.scala   |   1 +
 .../stream/StreamExecWatermarkAssigner.scala       |  18 +--
 .../physical/stream/StreamExecWindowJoin.scala     |  11 +-
 .../planner/plan/utils/ExecNodePlanDumper.scala    |   2 +-
 .../table/planner/plan/utils/FlinkRelOptUtil.scala |   2 +-
 .../table/planner/plan/utils/RelExplainUtil.scala  |  81 +++----------
 .../flink/table/planner/plan/utils/ScanUtil.scala  |   5 +-
 .../resources/explain/testGetStatsFromCatalog.out  |   6 +-
 .../apache/flink/table/api/batch/ExplainTest.xml   | 104 ++++++++--------
 .../apache/flink/table/api/stream/ExplainTest.xml  | 132 ++++++++++-----------
 .../table/planner/plan/batch/sql/LimitTest.xml     |   8 +-
 .../planner/plan/batch/sql/SetOperatorsTest.xml    |   6 +-
 .../table/planner/plan/batch/sql/SortLimitTest.xml |   8 +-
 .../table/planner/plan/batch/sql/UnnestTest.xml    |  20 ++--
 .../table/planner/plan/batch/sql/ValuesTest.xml    |  22 ++--
 .../plan/batch/sql/join/BroadcastHashJoinTest.xml  |   2 +-
 .../plan/batch/sql/join/NestedLoopJoinTest.xml     |   6 +-
 .../plan/batch/sql/join/SortMergeJoinTest.xml      |   2 +-
 .../logical/CalcPruneAggregateCallRuleTest.xml     |  10 +-
 .../rules/logical/FlinkAggregateRemoveRuleTest.xml |   8 +-
 .../rules/logical/FlinkLimit0RemoveRuleTest.xml    |  18 +--
 .../rules/logical/FlinkPruneEmptyRulesTest.xml     |   2 +-
 .../plan/rules/logical/LogicalUnnestRuleTest.xml   |  20 ++--
 .../logical/ProjectPruneAggregateCallRuleTest.xml  |   4 +-
 .../logical/subquery/SubQuerySemiJoinTest.xml      |   4 +-
 .../table/planner/plan/stream/sql/LimitTest.xml    |  24 ++--
 .../plan/stream/sql/MiniBatchIntervalInferTest.xml |  30 ++---
 .../table/planner/plan/stream/sql/RankTest.xml     |   4 +-
 .../planner/plan/stream/sql/SetOperatorsTest.xml   |   6 +-
 .../planner/plan/stream/sql/SortLimitTest.xml      |  32 ++---
 .../table/planner/plan/stream/sql/UnnestTest.xml   |  20 ++--
 .../table/planner/plan/stream/sql/ValuesTest.xml   |  22 ++--
 .../planner/plan/stream/sql/join/JoinTest.xml      |   2 +-
 .../plan/stream/sql/join/WindowJoinTest.xml        |   2 +-
 .../flink/table/planner/utils/TableTestBase.scala  |  26 ++--
 87 files changed, 405 insertions(+), 600 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelDisplayNameWriterImpl.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelDescriptionWriterImpl.java
similarity index 81%
rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelDisplayNameWriterImpl.java
rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelDescriptionWriterImpl.java
index b7f6d25..ebdc0d2 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelDisplayNameWriterImpl.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/utils/RelDescriptionWriterImpl.java
@@ -34,17 +34,19 @@ import java.util.List;
 /**
  * Converts an {@link RelNode} to string with only the information from the RelNode itself
  * without the information from its inputs. This is mainly used to generate
- * {@link FlinkRelNode#getDisplayName()}.
+ * {@link FlinkRelNode#getRelDetailedDescription()}.
  */
-public class RelDisplayNameWriterImpl implements RelWriter {
+public class RelDescriptionWriterImpl implements RelWriter {
 
-	private static final String STREAM_EXEC = "StreamExec";
-	private static final String BATCH_EXEC = "BatchExec";
+	/**
+	 * all the supported prefixes of RelNode class name (i.e. all the implementation of {@link FlinkRelNode}).
+	 */
+	private static final String[] REL_TYPE_NAME_PREFIXES = {"StreamExec", "BatchExec", "FlinkLogical"};
 
 	private final PrintWriter pw;
 	private final List<Pair<String, Object>> values = new ArrayList<>();
 
-	public RelDisplayNameWriterImpl(PrintWriter pw) {
+	public RelDescriptionWriterImpl(PrintWriter pw) {
 		this.pw = pw;
 	}
 
@@ -95,12 +97,11 @@ public class RelDisplayNameWriterImpl implements RelWriter {
 
 	private String getNodeTypeName(RelNode rel) {
 		String typeName = rel.getRelTypeName();
-		if (typeName.startsWith(STREAM_EXEC)) {
-			return typeName.substring(STREAM_EXEC.length());
-		} else if (typeName.startsWith(BATCH_EXEC)) {
-			return typeName.substring(BATCH_EXEC.length());
-		} else {
-			throw new IllegalStateException("Unsupported RelNode class name '" + typeName + "'");
+		for (String prefix : REL_TYPE_NAME_PREFIXES) {
+			if (typeName.startsWith(prefix)) {
+				return typeName.substring(prefix.length());
+			}
 		}
+		throw new IllegalStateException("Unsupported RelNode class name '" + typeName + "'");
 	}
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
index c292c64..31bed9e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
@@ -135,12 +135,7 @@ object CorrelateCodeGenerator {
 
     new OneInputTransformation(
       inputTransformation,
-      RelExplainUtil.correlateOpName(
-        inputRelType,
-        rexCall,
-        sqlFunction,
-        outDataType,
-        expression),
+      ruleDescription,
       substituteStreamOperator,
       BaseRowTypeInfo.of(returnType),
       parallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
index 213be6a..56a2fa2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala
@@ -104,7 +104,7 @@ class BatchPlanner(
     val explainLevel = if (extended) {
       SqlExplainLevel.ALL_ATTRIBUTES
     } else {
-      SqlExplainLevel.EXPPLAN_ATTRIBUTES
+      SqlExplainLevel.DIGEST_ATTRIBUTES
     }
     sb.append(ExecNodePlanDumper.dagToString(execNodes, explainLevel))
     sb.append(System.lineSeparator)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index 091083a..e5add45 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -95,7 +95,7 @@ class StreamPlanner(
     val (explainLevel, withRetractTraits) = if (extended) {
       (SqlExplainLevel.ALL_ATTRIBUTES, true)
     } else {
-      (SqlExplainLevel.EXPPLAN_ATTRIBUTES, false)
+      (SqlExplainLevel.DIGEST_ATTRIBUTES, false)
     }
     sb.append(ExecNodePlanDumper.dagToString(
       execNodes,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/FlinkRelNode.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/FlinkRelNode.scala
index 62a4d45..75fddec 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/FlinkRelNode.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/FlinkRelNode.scala
@@ -18,12 +18,13 @@
 
 package org.apache.flink.table.planner.plan.nodes
 
-import org.apache.calcite.rel.{RelNode, RelWriter}
+import org.apache.flink.table.planner.plan.nodes.ExpressionFormat.ExpressionFormat
+import org.apache.flink.table.planner.plan.utils.RelDescriptionWriterImpl
+
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.SqlAsOperator
 import org.apache.calcite.sql.SqlKind._
-import org.apache.flink.table.planner.plan.nodes.ExpressionFormat.ExpressionFormat
-import org.apache.flink.table.planner.plan.utils.RelDisplayNameWriterImpl
 
 import java.io.{PrintWriter, StringWriter}
 
@@ -35,13 +36,17 @@ import scala.collection.JavaConversions._
 trait FlinkRelNode extends RelNode {
 
   /**
-    * Returns the display name of the RelNode. The display name are usually used to be displayed
-    * in log or Web UI.
+    * Returns a string which describes the detailed information of relational expression
+    * with attributes which contribute to the plan output.
+    *
+    * This method leverages [[RelNode#explain]] with
+    * [[org.apache.calcite.sql.SqlExplainLevel.EXPPLAN_ATTRIBUTES]] explain level to generate
+    * the description.
     */
-  def getDisplayName: String = {
+  def getRelDetailedDescription: String = {
     val sw = new StringWriter
     val pw = new PrintWriter(sw)
-    val relWriter = new RelDisplayNameWriterImpl(pw)
+    val relWriter = new RelDescriptionWriterImpl(pw)
     this.explain(relWriter)
     sw.toString
   }
@@ -114,14 +119,6 @@ trait FlinkRelNode extends RelNode {
         throw new IllegalArgumentException(s"Unknown expression type '${expr.getClass}': $expr")
     }
   }
-
-  def getExpressionFormat(pw: RelWriter): ExpressionFormat = pw match {
-    // infix format is more readable for displaying
-    case _: RelDisplayNameWriterImpl => ExpressionFormat.Infix
-    // traditional writer prefers prefix expression format, e.g. +(x, y)
-    case _ => ExpressionFormat.Prefix
-  }
-
 }
 
 /**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Expand.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Expand.scala
index 093f87e..b5ea110 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Expand.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Expand.scala
@@ -23,6 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.metadata.RelMetadataQuery
 import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
 import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexNode}
+import org.apache.calcite.sql.SqlExplainLevel
 import org.apache.calcite.util.Litmus
 
 import java.util
@@ -82,14 +83,19 @@ abstract class Expand(
 
   override def explainTerms(pw: RelWriter): RelWriter = {
     val names = outputRowType.getFieldNames
-    val terms = projects.map {
-      project =>
-        project.zipWithIndex.map {
-          case (r: RexInputRef, i: Int) => s"${names.get(i)}=[${r.getName}]"
-          case (l: RexLiteral, i: Int) => s"${names.get(i)}=[${l.getValue3}]"
-          case (o, _) => s"$o"
-        }.mkString("{", ", ", "}")
-    }.mkString(", ")
+    val terms = if (pw.getDetailLevel == SqlExplainLevel.EXPPLAN_ATTRIBUTES) {
+      // improve the readability
+      names.mkString(", ")
+    } else {
+      projects.map {
+        project =>
+          project.zipWithIndex.map {
+            case (r: RexInputRef, i: Int) => s"${names.get(i)}=[${r.getName}]"
+            case (l: RexLiteral, i: Int) => s"${names.get(i)}=[${l.getValue3}]"
+            case (o, _) => s"$o"
+          }.mkString("{", ", ", "}")
+      }.mkString(", ")
+    }
     super.explainTerms(pw).item("projects", terms)
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala
index d50b11f..532e9df 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonCalc.scala
@@ -20,8 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.common
 
 import org.apache.flink.table.planner.plan.nodes.ExpressionFormat.ExpressionFormat
 import org.apache.flink.table.planner.plan.nodes.{ExpressionFormat, FlinkRelNode}
-import org.apache.flink.table.planner.plan.utils.RelExplainUtil.conditionToString
-
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil.{conditionToString, preferExpressionFormat}
 import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
 import org.apache.calcite.rel.core.Calc
 import org.apache.calcite.rel.metadata.RelMetadataQuery
@@ -60,9 +59,9 @@ abstract class CommonCalc(
 
   override def explainTerms(pw: RelWriter): RelWriter = {
     pw.input("input", getInput)
-      .item("select", projectionToString())
+      .item("select", projectionToString(preferExpressionFormat(pw)))
       .itemIf("where",
-        conditionToString(calcProgram, getExpressionString),
+        conditionToString(calcProgram, getExpressionString, preferExpressionFormat(pw)),
         calcProgram.getCondition != null)
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
index 88800da..a6163da 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
@@ -36,6 +36,7 @@ import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.{
 import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
 import org.apache.flink.table.planner.plan.utils.LookupJoinUtil._
 import org.apache.flink.table.planner.plan.utils.{JoinTypeUtil, RelExplainUtil}
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil.preferExpressionFormat
 import org.apache.flink.table.planner.utils.TableConfigUtils.getMillisecondFromConfigDuration
 import org.apache.flink.table.runtime.operators.join.lookup.{AsyncLookupJoinRunner, AsyncLookupJoinWithCalcRunner, LookupJoinRunner, LookupJoinWithCalcRunner}
 import org.apache.flink.table.runtime.types.ClassLogicalTypeConverter
@@ -126,7 +127,8 @@ abstract class CommonLookupJoin(
     val resultFieldNames = getRowType.getFieldNames.asScala.toArray
     val lookupableSource = tableSource.asInstanceOf[LookupableTableSource[_]]
     val whereString = calcOnTemporalTable match {
-      case Some(calc) => RelExplainUtil.conditionToString(calc, getExpressionString)
+      case Some(calc) => RelExplainUtil.conditionToString(
+        calc, getExpressionString, preferExpressionFormat(pw))
       case None => "N/A"
     }
 
@@ -343,7 +345,7 @@ abstract class CommonLookupJoin(
 
     new OneInputTransformation(
       inputTransformation,
-      "LookupJoin",
+      getRelDetailedDescription,
       operator,
       BaseRowTypeInfo.of(resultRowType),
       inputTransformation.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalJoin.scala
index a7844cb..7cff990 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonPhysicalJoin.scala
@@ -19,7 +19,8 @@
 package org.apache.flink.table.planner.plan.nodes.common
 
 import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel
-import org.apache.flink.table.planner.plan.utils.{JoinTypeUtil, JoinUtil, RelExplainUtil}
+import org.apache.flink.table.planner.plan.utils.{JoinTypeUtil, JoinUtil}
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil.preferExpressionFormat
 import org.apache.flink.table.runtime.operators.join.FlinkJoinType
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
@@ -78,8 +79,8 @@ abstract class CommonPhysicalJoin(
   override def explainTerms(pw: RelWriter): RelWriter = {
     pw.input("left", getLeft).input("right", getRight)
       .item("joinType", flinkJoinType.toString)
-      .item("where",
-        RelExplainUtil.expressionToString(getCondition, inputRowType, getExpressionString))
+      .item("where", getExpressionString(
+        getCondition, inputRowType.getFieldNames.toList, None, preferExpressionFormat(pw)))
       .item("select", getRowType.getFieldNames.mkString(", "))
   }
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala
index 3d03932..aefcd2f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCalc.scala
@@ -28,7 +28,6 @@ import org.apache.flink.table.planner.delegation.BatchPlanner
 import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef, TraitUtil}
 import org.apache.flink.table.planner.plan.nodes.common.CommonCalc
 import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
-import org.apache.flink.table.planner.plan.utils.RelExplainUtil
 import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
 
 import org.apache.calcite.plan._
@@ -158,7 +157,7 @@ class BatchExecCalc(
 
     new OneInputTransformation(
       inputTransform,
-      RelExplainUtil.calcToString(calcProgram, getExpressionString),
+      getRelDetailedDescription,
       operator,
       BaseRowTypeInfo.of(outputType),
       inputTransform.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
index b11a7a3..029c171 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecCorrelate.scala
@@ -188,7 +188,7 @@ class BatchExecCorrelate(
     val inputTransformation = getInputNodes.get(0).translateToPlan(planner)
       .asInstanceOf[Transformation[BaseRow]]
     val operatorCtx = CodeGeneratorContext(config)
-    CorrelateCodeGenerator.generateCorrelateTransformation(
+    val transformation = CorrelateCodeGenerator.generateCorrelateTransformation(
       config,
       operatorCtx,
       inputTransformation,
@@ -202,6 +202,8 @@ class BatchExecCorrelate(
       retainHeader = false,
       getExpressionString,
       "BatchExecCorrelate")
+    transformation.setName(getRelDetailedDescription)
+    transformation
   }
 
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExpand.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExpand.scala
index e6a9c8e..7c8ec3c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExpand.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecExpand.scala
@@ -99,10 +99,9 @@ class BatchExecExpand(
       projects,
       opName = "BatchExpand")
 
-    val operatorName = s"BatchExecExpand: ${getRowType.getFieldList.map(_.getName).mkString(", ")}"
     new OneInputTransformation(
       inputTransform,
-      operatorName,
+      getRelDetailedDescription,
       operator,
       BaseRowTypeInfo.of(outputType),
       inputTransform.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecGroupAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecGroupAggregateBase.scala
index 2592ac1..502a932 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecGroupAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecGroupAggregateBase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch
 
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.functions.UserDefinedFunction
-import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, RelExplainUtil}
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
 import org.apache.flink.table.planner.utils.AggregatePhaseStrategy
 import org.apache.flink.table.planner.utils.TableConfigUtils.getAggPhaseStrategy
 
@@ -74,18 +74,6 @@ abstract class BatchExecGroupAggregateBase(
 
   def getAggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)] = aggCallToAggFunction
 
-  def aggOperatorName(prefix: String): String = {
-    RelExplainUtil.aggOperatorName(
-      prefix,
-      grouping,
-      auxGrouping,
-      inputRowType,
-      outputRowType,
-      aggCallToAggFunction,
-      isMerge,
-      isFinal)
-  }
-
   protected def isEnforceTwoStageAgg: Boolean = {
     val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
     getAggPhaseStrategy(tableConfig) == AggregatePhaseStrategy.TWO_PHASE
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregate.scala
index 481f387..eac9b02 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregate.scala
@@ -147,9 +147,4 @@ class BatchExecHashAggregate(
   //~ ExecNode methods -----------------------------------------------------------
 
   override def getDamBehavior = DamBehavior.FULL_DAM
-
-  override def getOperatorName: String = {
-    val aggregateNamePrefix = if (isMerge) "Global" else "Complete"
-    aggOperatorName(aggregateNamePrefix + "HashAggregate")
-  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
index 46e8efd..042df22 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashAggregateBase.scala
@@ -117,8 +117,6 @@ abstract class BatchExecHashAggregateBase(
     replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
   }
 
-  def getOperatorName: String
-
   override protected def translateToPlanInternal(
       planner: BatchPlanner): Transformation[BaseRow] = {
     val config = planner.getTableConfig
@@ -147,7 +145,7 @@ abstract class BatchExecHashAggregateBase(
     val operator = new CodeGenOperatorFactory[BaseRow](generatedOperator)
     val ret = new OneInputTransformation(
       input,
-      getOperatorName,
+      getRelDetailedDescription,
       operator,
       BaseRowTypeInfo.of(outputType),
       input.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashJoin.scala
index e029b0f..1d01f11 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashJoin.scala
@@ -265,7 +265,7 @@ class BatchExecHashJoin(
     val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
       build,
       probe,
-      getOperatorName,
+      getRelDetailedDescription,
       operator,
       BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType)),
       probe.getParallelism)
@@ -273,14 +273,4 @@ class BatchExecHashJoin(
     ret.setResources(resource, resource)
     ret
   }
-
-  private def getOperatorName: String = {
-    val joinExpressionStr = if (getCondition != null) {
-      val inFields = inputRowType.getFieldNames.toList
-      s"where: ${getExpressionString(getCondition, inFields, None, ExpressionFormat.Infix)}, "
-    } else {
-      ""
-    }
-    s"HashJoin($joinExpressionStr${if (leftIsBuild) "buildLeft" else "buildRight"})"
-  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala
index 8e19c75..840a2e5 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregate.scala
@@ -91,8 +91,4 @@ class BatchExecHashWindowAggregate(
 
   override def getDamBehavior: DamBehavior = DamBehavior.FULL_DAM
 
-  override def getOperatorName: String = {
-    val aggregateNamePrefix = if (isMerge) "Global" else "Complete"
-    aggregateNamePrefix + "WindowHashAggregateBatchExec"
-  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
index 6074157..de0b97b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecHashWindowAggregateBase.scala
@@ -116,8 +116,6 @@ abstract class BatchExecHashWindowAggregateBase(
     replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
   }
 
-  def getOperatorName: String
-
   override protected def translateToPlanInternal(
       planner: BatchPlanner): Transformation[BaseRow] = {
     val config = planner.getTableConfig
@@ -149,7 +147,7 @@ abstract class BatchExecHashWindowAggregateBase(
     val operator = new CodeGenOperatorFactory[BaseRow](generatedOperator)
     val ret = new OneInputTransformation(
       input,
-      getOperatorName,
+      getRelDetailedDescription,
       operator,
       BaseRowTypeInfo.of(outputType),
       input.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLimit.scala
index 6b4ee94..ed64648 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLimit.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLimit.scala
@@ -108,14 +108,9 @@ class BatchExecLimit(
     val operator = new LimitOperator(isGlobal, limitStart, limitEnd)
     new OneInputTransformation(
       input,
-      getOperatorName,
+      getRelDetailedDescription,
       operator,
       inputType,
       input.getParallelism)
   }
-
-  private def getOperatorName = {
-    val prefix = if (isGlobal) "Global" else "Local"
-    s"${prefix}Limit(offset: $limitStart, fetch: ${fetchToString(fetch)})"
-  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashAggregate.scala
index b48ba36..54d0544 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashAggregate.scala
@@ -127,6 +127,4 @@ class BatchExecLocalHashAggregate(
   override def getDamBehavior: DamBehavior = {
     if (grouping.length == 0) DamBehavior.FULL_DAM else DamBehavior.MATERIALIZING
   }
-
-  override def getOperatorName: String = aggOperatorName("LocalHashAggregate")
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala
index aac9dc6..22162bf 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalHashWindowAggregate.scala
@@ -86,8 +86,4 @@ class BatchExecLocalHashWindowAggregate(
   //~ ExecNode methods -----------------------------------------------------------
 
   override def getDamBehavior: DamBehavior = DamBehavior.MATERIALIZING
-
-  override def getOperatorName: String = {
-    "LocalWindowHashAggregateBatchExec"
-  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala
index b615daa..c4078cb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortAggregate.scala
@@ -135,7 +135,4 @@ class BatchExecLocalSortAggregate(
   override def getDamBehavior: DamBehavior = {
     if (grouping.length == 0) DamBehavior.FULL_DAM else DamBehavior.MATERIALIZING
   }
-
-  override def getOperatorName: String = aggOperatorName("LocalSortAggregate")
-
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
index 02e2345..1f2482f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecLocalSortWindowAggregate.scala
@@ -86,6 +86,4 @@ class BatchExecLocalSortWindowAggregate(
   //~ ExecNode methods -----------------------------------------------------------
 
   override def getDamBehavior: DamBehavior = DamBehavior.MATERIALIZING
-
-  override def getOperatorName: String = "LocalSortWindowAggregateBatchExec"
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
index db30712..8345e1d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecNestedLoopJoin.scala
@@ -165,22 +165,11 @@ class BatchExecNestedLoopJoin(
     val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
       lInput,
       rInput,
-      getOperatorName,
+      getRelDetailedDescription,
       op,
       BaseRowTypeInfo.of(outputType),
       parallelism)
     ret.setResources(resourceSpec, resourceSpec)
     ret
   }
-
-  private def getOperatorName: String = {
-    val joinExpressionStr = if (getCondition != null) {
-      val inFields = inputRowType.getFieldNames.toList
-      s"where: ${getExpressionString(getCondition, inFields, None, ExpressionFormat.Infix)}, "
-    } else {
-      ""
-    }
-    s"NestedLoopJoin($joinExpressionStr${if (leftIsBuild) "buildLeft" else "buildRight"})"
-  }
-
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecOverAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecOverAggregate.scala
index abe140a..2baced1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecOverAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecOverAggregate.scala
@@ -421,7 +421,11 @@ class BatchExecOverAggregate(
     }
     val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
     val ret = new OneInputTransformation(
-      input, "OverAggregate", operator, BaseRowTypeInfo.of(outputType), input.getParallelism)
+      input,
+      getRelDetailedDescription,
+      operator,
+      BaseRowTypeInfo.of(outputType),
+      input.getParallelism)
     ret.setResources(resource, resource)
     ret
   }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala
index 52d237f..15c2bea 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecRank.scala
@@ -288,17 +288,9 @@ class BatchExecRank(
 
     new OneInputTransformation(
       input,
-      getOperatorName,
+      getRelDetailedDescription,
       operator,
       BaseRowTypeInfo.of(outputType),
       input.getParallelism)
   }
-
-  private def getOperatorName: String = {
-    if (isGlobal) {
-      "GlobalRank"
-    } else {
-      "LocalRank"
-    }
-  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSort.scala
index 66df094..6c4ec67 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSort.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSort.scala
@@ -128,7 +128,7 @@ class BatchExecSort(
 
     val ret = new OneInputTransformation(
       input,
-      s"Sort(${RelExplainUtil.collationToString(sortCollation, getRowType)})",
+      getRelDetailedDescription,
       operator.asInstanceOf[OneInputStreamOperator[BaseRow, BaseRow]],
       BaseRowTypeInfo.of(outputType),
       input.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregate.scala
index 9f9e665..02167a5 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregate.scala
@@ -161,10 +161,4 @@ class BatchExecSortAggregate(
   override def getDamBehavior: DamBehavior = {
     if (grouping.length == 0) DamBehavior.FULL_DAM else DamBehavior.PIPELINED
   }
-
-  override def getOperatorName: String = {
-    val aggregateNamePrefix = if (isMerge) "Global" else "Complete"
-    aggOperatorName(aggregateNamePrefix + "SortAggregate")
-  }
-
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
index 80f56f2..9fd3a3c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortAggregateBase.scala
@@ -99,8 +99,6 @@ abstract class BatchExecSortAggregateBase(
     replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
   }
 
-  def getOperatorName: String
-
   override protected def translateToPlanInternal(
       planner: BatchPlanner): Transformation[BaseRow] = {
     val input = getInputNodes.get(0).translateToPlan(planner)
@@ -122,7 +120,7 @@ abstract class BatchExecSortAggregateBase(
     val operator = new CodeGenOperatorFactory[BaseRow](generatedOperator)
     new OneInputTransformation(
       input,
-      getOperatorName,
+      getRelDetailedDescription,
       operator,
       BaseRowTypeInfo.of(outputType),
       input.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala
index 5426fb8..37635b2 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortLimit.scala
@@ -143,16 +143,9 @@ class BatchExecSortLimit(
 
     new OneInputTransformation(
       input,
-      getOperatorName,
+      getRelDetailedDescription,
       operator,
       inputType,
       input.getParallelism)
   }
-
-  private def getOperatorName = {
-    s"${if (isGlobal) "Global" else "Local"}SortLimit(" +
-        s"orderBy: [${RelExplainUtil.collationToString(sortCollation, getRowType)}], " +
-        s"offset: $limitStart, " +
-        s"fetch: ${RelExplainUtil.fetchToString(fetch)})"
-  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
index c15b6f7..c8c9b2d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortMergeJoin.scala
@@ -265,7 +265,7 @@ class BatchExecSortMergeJoin(
     val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
       leftInput,
       rightInput,
-      getOperatorName,
+      getRelDetailedDescription,
       operator,
       BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType)),
       rightInput.getParallelism)
@@ -278,12 +278,4 @@ class BatchExecSortMergeJoin(
     val mq = relNode.getCluster.getMetadataQuery
     mq.getAverageRowSize(relNode) * mq.getRowCount(relNode)
   }
-
-  private def getOperatorName: String = if (getCondition != null) {
-    val inFields = inputRowType.getFieldNames.toList
-    s"SortMergeJoin(where: ${
-      getExpressionString(getCondition, inFields, None, ExpressionFormat.Infix)})"
-  } else {
-    "SortMergeJoin"
-  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
index fba66df..a472999 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregate.scala
@@ -88,9 +88,4 @@ class BatchExecSortWindowAggregate(
   //~ ExecNode methods -----------------------------------------------------------
 
   override def getDamBehavior: DamBehavior = DamBehavior.PIPELINED
-
-  override def getOperatorName: String = {
-    val aggregateNamePrefix = if (isMerge) "Global" else "Complete"
-    aggregateNamePrefix + "WindowSortAggregateBatchExec"
-  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
index 8c621d5..1e1111e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecSortWindowAggregateBase.scala
@@ -105,8 +105,6 @@ abstract class BatchExecSortWindowAggregateBase(
     replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
   }
 
-  def getOperatorName: String
-
   override protected def translateToPlanInternal(
       planner: BatchPlanner): Transformation[BaseRow] = {
     val input = getInputNodes.get(0).translateToPlan(planner)
@@ -137,7 +135,7 @@ abstract class BatchExecSortWindowAggregateBase(
     val operator = new CodeGenOperatorFactory[BaseRow](generatedOperator)
     new OneInputTransformation(
       input,
-      getOperatorName,
+      getRelDetailedDescription,
       operator,
       BaseRowTypeInfo.of(outputType),
       input.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala
index e1e13ea..8abba12 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchExecValues.scala
@@ -80,6 +80,7 @@ class BatchExecValues(
       getRelTypeName)
     val transformation = planner.getExecEnv.createInput(inputFormat,
       inputFormat.getProducedType).getTransformation
+    transformation.setName(getRelDetailedDescription)
     transformation.setParallelism(1)
     transformation
   }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala
index a249436..dc98107 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCalc.scala
@@ -119,7 +119,7 @@ class StreamExecCalc(
     )
     val ret = new OneInputTransformation(
       inputTransform,
-      RelExplainUtil.calcToString(calcProgram, getExpressionString),
+      getRelDetailedDescription,
       substituteStreamOperator,
       BaseRowTypeInfo.of(outputType),
       inputTransform.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala
index ae5788a..0675669 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecCorrelate.scala
@@ -135,6 +135,7 @@ class StreamExecCorrelate(
       retainHeader = true,
       getExpressionString,
       "StreamExecCorrelate")
+    transform.setName(getRelDetailedDescription)
     if (inputsContainSingleton()) {
       transform.setParallelism(1)
       transform.setMaxParallelism(1)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala
index 888ee6e..d43c223 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecDeduplicate.scala
@@ -82,8 +82,9 @@ class StreamExecDeduplicate(
 
   override def explainTerms(pw: RelWriter): RelWriter = {
     val fieldNames = getRowType.getFieldNames
+    val keep = if (keepLastRow) "LastRow" else "FirstRow"
     super.explainTerms(pw)
-      .item("keepLastRow", keepLastRow)
+      .item("keep", keep)
       .item("key", uniqueKeys.map(fieldNames.get).mkString(", "))
       .item("order", "PROCTIME")
   }
@@ -141,7 +142,7 @@ class StreamExecDeduplicate(
     }
     val ret = new OneInputTransformation(
       inputTransform,
-      getOperatorName,
+      getRelDetailedDescription,
       operator,
       rowTypeInfo,
       inputTransform.getParallelism)
@@ -156,12 +157,4 @@ class StreamExecDeduplicate(
     ret.setStateKeyType(selector.getProducedType)
     ret
   }
-
-  private def getOperatorName: String = {
-    val fieldNames = getRowType.getFieldNames
-    val keyNames = uniqueKeys.map(fieldNames.get).mkString(", ")
-    s"${if (keepLastRow) "keepLastRow" else "KeepFirstRow"}" +
-      s": (key: ($keyNames), select: (${fieldNames.mkString(", ")}), order: (PROCTIME)"
-  }
-
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExpand.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExpand.scala
index ec113a4..3abb0b9 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExpand.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecExpand.scala
@@ -94,10 +94,9 @@ class StreamExecExpand(
       opName = "StreamExpand",
       retainHeader = true)
 
-    val operatorName = s"StreamExecExpand: ${getRowType.getFieldList.map(_.getName).mkString(", ")}"
     val transform = new OneInputTransformation(
       inputTransform,
-      operatorName,
+      getRelDetailedDescription,
       operator,
       BaseRowTypeInfo.of(outputType),
       inputTransform.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
index ba27a5a..8c1e1cd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGlobalGroupAggregate.scala
@@ -186,7 +186,7 @@ class StreamExecGlobalGroupAggregate(
     // partitioned aggregation
     val ret = new OneInputTransformation(
       inputTransformation,
-      "GlobalGroupAggregate",
+      getRelDetailedDescription,
       operator,
       BaseRowTypeInfo.of(outRowType),
       inputTransformation.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
index 676d05a..b922c86 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupAggregate.scala
@@ -203,7 +203,7 @@ class StreamExecGroupAggregate(
     // partitioned aggregation
     val ret = new OneInputTransformation(
       inputTransformation,
-      "GroupAggregate",
+      getRelDetailedDescription,
       operator,
       BaseRowTypeInfo.of(outRowType),
       inputTransformation.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
index 212b01a..df6f95b 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregate.scala
@@ -218,17 +218,9 @@ class StreamExecGroupWindowAggregate(
       inputRowTypeInfo.getLogicalTypes,
       timeIdx)
 
-    val operatorName = if (grouping.nonEmpty) {
-      s"window: ($window), " +
-        s"groupBy: (${RelExplainUtil.fieldToString(grouping, inputRowType)}), " +
-        s"select: ($aggString)"
-    } else {
-      s"window: ($window), select: ($aggString)"
-    }
-
     val transformation = new OneInputTransformation(
       inputTransform,
-      operatorName,
+      getRelDetailedDescription,
       operator,
       outRowType,
       inputTransform.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
index ae52f71..5c7c9eb 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecIncrementalGroupAggregate.scala
@@ -183,7 +183,7 @@ class StreamExecIncrementalGroupAggregate(
     // partitioned aggregation
     val ret = new OneInputTransformation(
       inputTransformation,
-      "IncrementalGroupAggregate",
+      getRelDetailedDescription,
       operator,
       BaseRowTypeInfo.of(outRowType),
       inputTransformation.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecJoin.scala
index 77a8e09..eb989be 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecJoin.scala
@@ -194,7 +194,7 @@ class StreamExecJoin(
     val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
       leftTransform,
       rightTransform,
-      getJoinOperatorName(),
+      getRelDetailedDescription,
       operator,
       returnType,
       leftTransform.getParallelism)
@@ -249,11 +249,4 @@ class StreamExecJoin(
     }
     smallest
   }
-
-  private def getJoinOperatorName(): String = {
-    val where = RelExplainUtil.expressionToString(getCondition, inputRowType, getExpressionString)
-    val select = getRowType.getFieldNames.mkString(", ")
-    s"${flinkJoinType.toString}(where: ($where), select: ($select), " +
-      s"leftInputSpec: ${analyzeJoinInput(left)}, rightInputSpec: ${analyzeJoinInput(right)})"
-  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLimit.scala
index 98b4a02..b25d607 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLimit.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLimit.scala
@@ -167,7 +167,7 @@ class StreamExecLimit(
     // as input node is singleton exchange, its parallelism is 1.
     val ret = new OneInputTransformation(
       inputTransform,
-      s"Limit(offset: $limitStart, fetch: ${fetchToString(fetch)})",
+      getRelDetailedDescription,
       operator,
       outputRowTypeInfo,
       inputTransform.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala
index c898881..f128c60 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecLocalGroupAggregate.scala
@@ -146,7 +146,7 @@ class StreamExecLocalGroupAggregate(
 
     val transformation = new OneInputTransformation(
       inputTransformation,
-      "LocalGroupAggregate",
+      getRelDetailedDescription,
       operator,
       BaseRowTypeInfo.of(outRowType),
       inputTransformation.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecMatch.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecMatch.scala
index 6c07531..8415da3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecMatch.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecMatch.scala
@@ -231,7 +231,7 @@ class StreamExecMatch(
       val outputRowTypeInfo = BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType))
       val transformation = new OneInputTransformation[BaseRow, BaseRow](
         timestampedInput,
-        toString,
+        getRelDetailedDescription,
         operator,
         outputRowTypeInfo,
         timestampedInput.getParallelism
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala
index c957110..eae561a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecOverAggregate.scala
@@ -282,7 +282,7 @@ class StreamExecOverAggregate(
 
     val ret = new OneInputTransformation(
       inputDS,
-      "OverAggregate",
+      getRelDetailedDescription,
       operator,
       returnTypeInfo,
       inputDS.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecRank.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecRank.scala
index d227dd6..08717d0 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecRank.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecRank.scala
@@ -204,7 +204,6 @@ class StreamExecRank(
           generateRetraction,
           outputRankNumber)
     }
-    val rankOpName = getOperatorName
     val operator = new KeyedProcessOperator(processFunction)
     processFunction.setKeyContext(operator)
     val inputTransform = getInputNodes.get(0).translateToPlan(planner)
@@ -213,7 +212,7 @@ class StreamExecRank(
       FlinkTypeFactory.toLogicalRowType(getRowType))
     val ret = new OneInputTransformation(
       inputTransform,
-      rankOpName,
+      getRelDetailedDescription,
       operator,
       outputRowTypeInfo,
       inputTransform.getParallelism)
@@ -229,19 +228,6 @@ class StreamExecRank(
     ret.setStateKeyType(selector.getProducedType)
     ret
   }
-
-  private def getOperatorName: String = {
-    val inputRowType = inputRel.getRowType
-    var result = getStrategy().toString
-    result += s"(orderBy: (${RelExplainUtil.collationToString(orderKey, inputRowType)})"
-    if (partitionKey.nonEmpty) {
-      val partitionKeys = partitionKey.toArray
-      result += s", partitionBy: (${RelExplainUtil.fieldToString(partitionKeys, inputRowType)})"
-    }
-    result += s", ${getRowType.getFieldNames.mkString(", ")}"
-    result += s", ${rankRange.toString(inputRowType.getFieldNames)})"
-    result
-  }
 }
 object StreamExecRank {
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSort.scala
index 3a7c927..f213253 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSort.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSort.scala
@@ -132,7 +132,7 @@ class StreamExecSort(
     // as input node is singleton exchange, its parallelism is 1.
     val ret = new OneInputTransformation(
       input,
-      s"Sort(${RelExplainUtil.collationToString(sortCollation, getRowType)})",
+      getRelDetailedDescription,
       sortOperator,
       outputRowTypeInfo,
       input.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala
index b34975b..0a48d55 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecSortLimit.scala
@@ -214,7 +214,7 @@ class StreamExecSortLimit(
 
     val ret = new OneInputTransformation(
       inputTransform,
-      getOperatorName,
+      getRelDetailedDescription,
       operator,
       outputRowTypeInfo,
       inputTransform.getParallelism)
@@ -229,11 +229,4 @@ class StreamExecSortLimit(
     ret.setStateKeyType(selector.getProducedType)
     ret
   }
-
-  private def getOperatorName = {
-    s"SortLimit(" +
-      s"orderBy: [${RelExplainUtil.collationToString(sortCollation, getRowType)}], " +
-      s"offset: $limitStart, " +
-      s"fetch: ${RelExplainUtil.fetchToString(fetch)})"
-  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
index 9002452..4e5e327 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalJoin.scala
@@ -140,7 +140,7 @@ class StreamExecTemporalJoin(
     val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
       leftTransform,
       rightTransform,
-      getJoinOperatorName,
+      getRelDetailedDescription,
       joinOperator,
       BaseRowTypeInfo.of(returnType),
       leftTransform.getParallelism)
@@ -176,12 +176,6 @@ class StreamExecTemporalJoin(
       }
     })
   }
-
-  private def getJoinOperatorName: String = {
-    val where = RelExplainUtil.expressionToString(getCondition, inputRowType, getExpressionString)
-    val select = getRowType.getFieldNames.mkString(", ")
-    s"TemporalTableJoin(where: ($where), select: ($select)"
-  }
 }
 
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalSort.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalSort.scala
index 030c483..aac4a7d 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalSort.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTemporalSort.scala
@@ -147,7 +147,7 @@ class StreamExecTemporalSort(
       // as input node is singleton exchange, its parallelism is 1.
       val ret = new OneInputTransformation(
         input,
-        "ProcTimeSortOperator",
+        getRelDetailedDescription,
         sortOperator,
         outputRowTypeInfo,
         input.getParallelism)
@@ -187,7 +187,7 @@ class StreamExecTemporalSort(
 
     val ret = new OneInputTransformation(
       input,
-      "RowTimeSortOperator",
+      getRelDetailedDescription,
       sortOperator,
       outputRowTypeInfo,
       input.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecValues.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecValues.scala
index a95558b..6ec1d1f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecValues.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecValues.scala
@@ -82,6 +82,7 @@ class StreamExecValues(
       getRelTypeName)
     val transformation = planner.getExecEnv.createInput(inputFormat,
       inputFormat.getProducedType).getTransformation
+    transformation.setName(getRelDetailedDescription)
     transformation.setParallelism(1)
     transformation.setMaxParallelism(1)
     transformation
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala
index 0db10e4..871d54e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWatermarkAssigner.scala
@@ -119,39 +119,31 @@ class StreamExecWatermarkAssigner(
     val idleTimeout = getMillisecondFromConfigDuration(config,
       ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT)
 
-    val (operator, opName) = if (inferredInterval.mode == MiniBatchMode.None ||
+    val operator = if (inferredInterval.mode == MiniBatchMode.None ||
       inferredInterval.interval == 0) {
       require(rowtimeFieldIndex.isDefined, "rowtimeFieldIndex should not be None")
       require(watermarkDelay.isDefined, "watermarkDelay should not be None")
       // 1. redundant watermark definition in DDL
       // 2. existing window aggregate
       // 3. operator requiring watermark, but minibatch is not enabled
-      val op = new WatermarkAssignerOperator(rowtimeFieldIndex.get, watermarkDelay.get, idleTimeout)
-      val opName =
-        s"WatermarkAssigner(rowtime: ${rowtimeFieldIndex.get}, offset: ${watermarkDelay.get})"
-      (op, opName)
+      new WatermarkAssignerOperator(rowtimeFieldIndex.get, watermarkDelay.get, idleTimeout)
     } else if (inferredInterval.mode == MiniBatchMode.ProcTime) {
-      val op = new MiniBatchAssignerOperator(inferredInterval.interval)
-      val opName = s"MiniBatchAssigner(intervalMs: ${inferredInterval.interval})"
-      (op, opName)
+      new MiniBatchAssignerOperator(inferredInterval.interval)
     } else {
       require(rowtimeFieldIndex.isDefined, "rowtimeFieldIndex should not be None")
       require(watermarkDelay.isDefined, "watermarkDelay should not be None")
-      val op = new MiniBatchedWatermarkAssignerOperator(
+      new MiniBatchedWatermarkAssignerOperator(
         rowtimeFieldIndex.get,
         watermarkDelay.get,
         0,
         idleTimeout,
         inferredInterval.interval)
-      val opName = s"MiniBatchedWatermarkAssigner(rowtime: ${rowtimeFieldIndex.get}," +
-        s" offset: ${watermarkDelay.get}, intervalMs: ${inferredInterval.interval})"
-      (op, opName)
     }
 
     val outputRowTypeInfo = BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType))
     val transformation = new OneInputTransformation[BaseRow, BaseRow](
       inputTransformation,
-      opName,
+      getRelDetailedDescription,
       operator,
       outputRowTypeInfo,
       inputTransformation.getParallelism)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWindowJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWindowJoin.scala
index d1127cc..2b09625 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWindowJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecWindowJoin.scala
@@ -29,7 +29,8 @@ import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.delegation.StreamPlanner
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode}
-import org.apache.flink.table.planner.plan.utils.{JoinTypeUtil, KeySelectorUtil, RelExplainUtil, UpdatingPlanChecker, WindowJoinUtil}
+import org.apache.flink.table.planner.plan.utils.{JoinTypeUtil, KeySelectorUtil, UpdatingPlanChecker, WindowJoinUtil}
+import org.apache.flink.table.planner.plan.utils.RelExplainUtil.preferExpressionFormat
 import org.apache.flink.table.runtime.generated.GeneratedFunction
 import org.apache.flink.table.runtime.operators.join.{FlinkJoinType, KeyedCoProcessOperatorWithWatermarkDelay, OuterJoinPaddingUtil, ProcTimeBoundedStreamJoin, RowTimeBoundedStreamJoin}
 import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
@@ -105,8 +106,8 @@ class StreamExecWindowJoin(
     super.explainTerms(pw)
       .item("joinType", flinkJoinType.toString)
       .item("windowBounds", windowBounds)
-      .item("where",
-        RelExplainUtil.expressionToString(joinCondition, outputRowType, getExpressionString))
+      .item("where", getExpressionString(
+        joinCondition, outputRowType.getFieldNames.toList, None, preferExpressionFormat(pw)))
       .item("select", getRowType.getFieldNames.mkString(", "))
   }
 
@@ -294,7 +295,7 @@ class StreamExecWindowJoin(
     val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
       leftPlan,
       rightPlan,
-      "Co-Process",
+      getRelDetailedDescription,
       new LegacyKeyedCoProcessOperator(procJoinFunc).
         asInstanceOf[TwoInputStreamOperator[BaseRow,BaseRow,BaseRow]],
       returnTypeInfo,
@@ -338,7 +339,7 @@ class StreamExecWindowJoin(
     val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
       leftPlan,
       rightPlan,
-      "Co-Process",
+      getRelDetailedDescription,
       new KeyedCoProcessOperatorWithWatermarkDelay(rowJoinFunc, rowJoinFunc.getMaxOutputDelay)
         .asInstanceOf[TwoInputStreamOperator[BaseRow,BaseRow,BaseRow]],
       returnTypeInfo,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala
index 750cb56..34a4d9a 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ExecNodePlanDumper.scala
@@ -82,7 +82,7 @@ object ExecNodePlanDumper {
     */
   def dagToString(
       nodes: Seq[ExecNode[_, _]],
-      detailLevel: SqlExplainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+      detailLevel: SqlExplainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES,
       withExecNodeId: Boolean = false,
       withRetractTraits: Boolean = false,
       withOutputType: Boolean = false,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
index 832f059..acf20ec 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
@@ -70,7 +70,7 @@ object FlinkRelOptUtil {
     */
   def toString(
       rel: RelNode,
-      detailLevel: SqlExplainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+      detailLevel: SqlExplainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES,
       withIdPrefix: Boolean = false,
       withRetractTraits: Boolean = false,
       withRowType: Boolean = false): String = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala
index 6cf72aa..15511e8 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.planner.plan.nodes.ExpressionFormat
 import org.apache.flink.table.planner.plan.nodes.ExpressionFormat.ExpressionFormat
 
 import com.google.common.collect.ImmutableMap
-import org.apache.calcite.rel.RelCollation
+import org.apache.calcite.rel.{RelCollation, RelWriter}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.Window.Group
 import org.apache.calcite.rel.core.{AggregateCall, Window}
@@ -47,6 +47,19 @@ import scala.collection.mutable
 object RelExplainUtil {
 
   /**
+    * Returns the prefer [[ExpressionFormat]] of the [[RelWriter]]. Use Prefix for traditional
+    * writers, but use Infix for [[RelDescriptionWriterImpl]] which is more readable.
+    * The [[RelDescriptionWriterImpl]] is mainly used to generate
+    * [[org.apache.flink.table.planner.plan.nodes.FlinkRelNode#getRelDetailedDescription()]].
+    */
+  def preferExpressionFormat(pw: RelWriter): ExpressionFormat = pw match {
+    // infix format is more readable for displaying
+    case _: RelDescriptionWriterImpl => ExpressionFormat.Infix
+    // traditional writer prefers prefix expression format, e.g. +(x, y)
+    case _ => ExpressionFormat.Prefix
+  }
+
+  /**
     * Converts field names corresponding to given indices to String.
     */
   def fieldToString(fieldIndices: Array[Int], inputType: RelDataType): String = {
@@ -574,33 +587,16 @@ object RelExplainUtil {
     buf.toString
   }
 
-  def calcToString(
-      calcProgram: RexProgram,
-      f: (RexNode, List[String], Option[List[RexNode]], ExpressionFormat) => String): String = {
-    val inFields = calcProgram.getInputRowType.getFieldNames.toList
-    val localExprs = calcProgram.getExprList.toList
-    val selectionStr = selectionToString(calcProgram, f, ExpressionFormat.Infix)
-    val cond = calcProgram.getCondition
-    val name = s"${
-      if (cond != null) {
-        s"where: ${
-          f(cond, inFields, Some(localExprs), ExpressionFormat.Infix)}, "
-      } else {
-        ""
-      }
-    }select: ($selectionStr)"
-    s"Calc($name)"
-  }
-
   def conditionToString(
       calcProgram: RexProgram,
-      f: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
+      f: (RexNode, List[String], Option[List[RexNode]], ExpressionFormat) => String,
+      expressionFormat: ExpressionFormat = ExpressionFormat.Prefix): String = {
     val cond = calcProgram.getCondition
     val inputFieldNames = calcProgram.getInputRowType.getFieldNames.toList
     val localExprs = calcProgram.getExprList.toList
 
     if (cond != null) {
-      f(cond, inputFieldNames, Some(localExprs))
+      f(cond, inputFieldNames, Some(localExprs), expressionFormat)
     } else {
       ""
     }
@@ -626,16 +622,6 @@ object RelExplainUtil {
     }.mkString(", ")
   }
 
-  def correlateOpName(
-      inputType: RelDataType,
-      rexCall: RexCall,
-      sqlFunction: TableSqlFunction,
-      rowType: RelDataType,
-      expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
-    s"correlate: ${correlateToString(inputType, rexCall, sqlFunction, expression)}," +
-        s" select: ${rowType.getFieldNames.mkString(",")}"
-  }
-
   def correlateToString(
       inputType: RelDataType,
       rexCall: RexCall,
@@ -647,39 +633,6 @@ object RelExplainUtil {
     s"table($udtfName($operands))"
   }
 
-  def aggOperatorName(
-      prefix: String,
-      grouping: Array[Int],
-      auxGrouping: Array[Int],
-      inputRowType: RelDataType,
-      outputRowType: RelDataType,
-      aggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)],
-      isMerge: Boolean,
-      isFinal: Boolean): String = {
-    val groupingStr = if (grouping.nonEmpty) {
-      s"groupBy:(${fieldToString(grouping, inputRowType)}),"
-    } else {
-      ""
-    }
-    val auxGroupingStr = if (auxGrouping.nonEmpty) {
-      s"auxGrouping:(${fieldToString(auxGrouping, inputRowType)}),"
-    } else {
-      ""
-    }
-
-    val selectString = s"select:(${
-      groupAggregationToString(
-        inputRowType,
-        outputRowType,
-        grouping,
-        auxGrouping,
-        aggCallToAggFunction,
-        isMerge,
-        isFinal)
-    }),"
-    s"$prefix($groupingStr$auxGroupingStr$selectString)"
-  }
-
   def windowAggregationToString(
       inputType: RelDataType,
       grouping: Array[Int],
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
index 7c1013a..37f55a4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ScanUtil.scala
@@ -131,7 +131,8 @@ object ScanUtil {
     * @param qualifiedName qualified name for table
     */
   private[flink] def getOperatorName(qualifiedName: Seq[String], rowType: RelDataType): String = {
-    val s = s"table:$qualifiedName, fields:(${rowType.getFieldNames.mkString(", ")})"
-    s"SourceConversion($s)"
+    val tableQualifiedName = qualifiedName.mkString(".")
+    val fieldNames = rowType.getFieldNames.mkString(", ")
+    s"SourceConversion(table=[$tableQualifiedName], fields=[$fieldNames])"
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out b/flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out
index 223aeee..56d2ca8 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out
+++ b/flink-table/flink-table-planner-blink/src/test/resources/explain/testGetStatsFromCatalog.out
@@ -16,17 +16,17 @@ HashJoin(joinType=[InnerJoin], where=[=(s3, s30)], select=[b1, l2, s3, d4, dd5,
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, T1, source: [TestTableSource(b1, l2, s3, d4, dd5)]), fields:(b1, l2, s3, d4, dd5))
+		content : SourceConversion(table=[default_catalog.default_database.T1, source: [TestTableSource(b1, l2, s3, d4, dd5)]], fields=[b1, l2, s3, d4, dd5])
 		ship_strategy : FORWARD
 
  : Data Source
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, T2, source: [TestTableSource(b1, l2, s3, d4, dd5)]), fields:(b1, l2, s3, d4, dd5))
+		content : SourceConversion(table=[default_catalog.default_database.T2, source: [TestTableSource(b1, l2, s3, d4, dd5)]], fields=[b1, l2, s3, d4, dd5])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : HashJoin(where: (s3 = s30), buildLeft)
+			content : HashJoin(joinType=[InnerJoin], where=[(s3 = s30)], select=[b1, l2, s3, d4, dd5, b10, l20, s30, d40, dd50], isBroadcast=[true], build=[left])
 			ship_strategy : BROADCAST
 
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/batch/ExplainTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/batch/ExplainTest.xml
index 8eda3a8..06d8b88 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/batch/ExplainTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/batch/ExplainTest.xml
@@ -37,23 +37,23 @@ Calc(select=[EXPR$0])
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : Calc(select: (a))
+			content : Calc(select=[a])
 			ship_strategy : FORWARD
 
 			 : Operator
-				content : LocalHashAggregate(groupBy:(a),select:(a, Partial_COUNT(*) AS count1$0),)
+				content : LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0])
 				ship_strategy : FORWARD
 
 				 : Operator
-					content : GlobalHashAggregate(groupBy:(a),select:(a, Final_COUNT(count1$0) AS EXPR$0),)
+					content : HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count1$0) AS EXPR$0])
 					ship_strategy : HASH[a]
 
 					 : Operator
-						content : Calc(select: (EXPR$0))
+						content : Calc(select=[EXPR$0])
 						ship_strategy : FORWARD
 
 ]]>
@@ -80,23 +80,23 @@ Calc(select=[EXPR$0]): rowcount = , cumulative cost = {rows, cpu, io, network, m
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : Calc(select: (a))
+			content : Calc(select=[a])
 			ship_strategy : FORWARD
 
 			 : Operator
-				content : LocalHashAggregate(groupBy:(a),select:(a, Partial_COUNT(*) AS count1$0),)
+				content : LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0])
 				ship_strategy : FORWARD
 
 				 : Operator
-					content : GlobalHashAggregate(groupBy:(a),select:(a, Final_COUNT(count1$0) AS EXPR$0),)
+					content : HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count1$0) AS EXPR$0])
 					ship_strategy : HASH[a]
 
 					 : Operator
-						content : Calc(select: (EXPR$0))
+						content : Calc(select=[EXPR$0])
 						ship_strategy : FORWARD
 
 ]]>
@@ -116,7 +116,7 @@ BoundedStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 ]]>
@@ -136,7 +136,7 @@ BoundedStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 ]]>
@@ -158,11 +158,11 @@ Calc(select=[a, b, c], where=[=(MOD(a, 2), 0)])
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : Calc(where: ((a MOD 2) = 0), select: (a, b, c))
+			content : Calc(select=[a, b, c], where=[((a MOD 2) = 0)])
 			ship_strategy : FORWARD
 
 ]]>
@@ -184,11 +184,11 @@ Calc(select=[a, b, c], where=[=(MOD(a, 2), 0)]): rowcount = , cumulative cost =
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : Calc(where: ((a MOD 2) = 0), select: (a, b, c))
+			content : Calc(select=[a, b, c], where=[((a MOD 2) = 0)])
 			ship_strategy : FORWARD
 
 ]]>
@@ -219,19 +219,19 @@ Calc(select=[a, b, c, e, f])
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable2), fields:(d, e, f))
+			content : SourceConversion(table=[default_catalog.default_database.MyTable2], fields=[d, e, f])
 			ship_strategy : FORWARD
 
 			 : Operator
-				content : SortMergeJoin(where: (a = d))
+				content : SortMergeJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, b, c, d, e, f])
 				ship_strategy : HASH[a]
 
 				 : Operator
-					content : Calc(select: (a, b, c, e, f))
+					content : Calc(select=[a, b, c, e, f])
 					ship_strategy : FORWARD
 
 ]]>
@@ -262,19 +262,19 @@ Calc(select=[a, b, c, e, f]): rowcount = , cumulative cost = {rows, cpu, io, net
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable2), fields:(d, e, f))
+			content : SourceConversion(table=[default_catalog.default_database.MyTable2], fields=[d, e, f])
 			ship_strategy : FORWARD
 
 			 : Operator
-				content : SortMergeJoin(where: (a = d))
+				content : SortMergeJoin(joinType=[InnerJoin], where=[(a = d)], select=[a, b, c, d, e, f])
 				ship_strategy : HASH[a]
 
 				 : Operator
-					content : Calc(select: (a, b, c, e, f))
+					content : Calc(select=[a, b, c, e, f])
 					ship_strategy : FORWARD
 
 ]]>
@@ -317,23 +317,23 @@ Sink(name=[sink2], fields=[a, cnt])
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : Calc(select: (a))
+			content : Calc(select=[a])
 			ship_strategy : FORWARD
 
 			 : Operator
-				content : LocalHashAggregate(groupBy:(a),select:(a, Partial_COUNT(*) AS count1$0),)
+				content : LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0])
 				ship_strategy : FORWARD
 
 				 : Operator
-					content : GlobalHashAggregate(groupBy:(a),select:(a, Final_COUNT(count1$0) AS cnt),)
+					content : HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count1$0) AS cnt])
 					ship_strategy : HASH[a]
 
 					 : Operator
-						content : Calc(where: (cnt > 10), select: (a, cnt))
+						content : Calc(select=[a, cnt], where=[(cnt > 10)])
 						ship_strategy : FORWARD
 
 						 : Operator
@@ -341,7 +341,7 @@ Sink(name=[sink2], fields=[a, cnt])
 							ship_strategy : FORWARD
 
 							 : Operator
-								content : Calc(where: (cnt < 10), select: (a, cnt))
+								content : Calc(select=[a, cnt], where=[(cnt < 10)])
 								ship_strategy : FORWARD
 
 								 : Operator
@@ -396,23 +396,23 @@ Sink(name=[sink2], fields=[a, cnt]): rowcount = , cumulative cost = {rows, cpu,
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : Calc(select: (a))
+			content : Calc(select=[a])
 			ship_strategy : FORWARD
 
 			 : Operator
-				content : LocalHashAggregate(groupBy:(a),select:(a, Partial_COUNT(*) AS count1$0),)
+				content : LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0])
 				ship_strategy : FORWARD
 
 				 : Operator
-					content : GlobalHashAggregate(groupBy:(a),select:(a, Final_COUNT(count1$0) AS cnt),)
+					content : HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_COUNT(count1$0) AS cnt])
 					ship_strategy : HASH[a]
 
 					 : Operator
-						content : Calc(where: (cnt > 10), select: (a, cnt))
+						content : Calc(select=[a, cnt], where=[(cnt > 10)])
 						ship_strategy : FORWARD
 
 						 : Operator
@@ -420,7 +420,7 @@ Sink(name=[sink2], fields=[a, cnt]): rowcount = , cumulative cost = {rows, cpu,
 							ship_strategy : FORWARD
 
 							 : Operator
-								content : Calc(where: (cnt < 10), select: (a, cnt))
+								content : Calc(select=[a, cnt], where=[(cnt < 10)])
 								ship_strategy : FORWARD
 
 								 : Operator
@@ -456,11 +456,11 @@ Sink(name=[sink], fields=[a, b, c])
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : Calc(where: (a > 10), select: (a, b, c))
+			content : Calc(select=[a, b, c], where=[(a > 10)])
 			ship_strategy : FORWARD
 
 			 : Operator
@@ -492,11 +492,11 @@ Sink(name=[sink], fields=[a, b, c]): rowcount = , cumulative cost = {rows, cpu,
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : Calc(where: (a > 10), select: (a, b, c))
+			content : Calc(select=[a, b, c], where=[(a > 10)])
 			ship_strategy : FORWARD
 
 			 : Operator
@@ -528,15 +528,15 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], global=[true])
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : LocalSortLimit(orderBy: [a ASC], offset: 0, fetch: 5)
+			content : SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], global=[false])
 			ship_strategy : FORWARD
 
 			 : Operator
-				content : GlobalSortLimit(orderBy: [a ASC], offset: 0, fetch: 5)
+				content : SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], global=[true])
 				ship_strategy : GLOBAL
 
 ]]>
@@ -560,15 +560,15 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], global=[true]): rowcount = , c
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : LocalSortLimit(orderBy: [a ASC], offset: 0, fetch: 5)
+			content : SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], global=[false])
 			ship_strategy : FORWARD
 
 			 : Operator
-				content : GlobalSortLimit(orderBy: [a ASC], offset: 0, fetch: 5)
+				content : SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], global=[true])
 				ship_strategy : GLOBAL
 
 ]]>
@@ -588,7 +588,7 @@ TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [Tes
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [TestTableSource(a, b, c)]], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 ]]>
@@ -608,7 +608,7 @@ TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [Tes
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [TestTableSource(a, b, c)]], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 ]]>
@@ -636,11 +636,11 @@ Union(all=[true], union=[a, b, c]): rowcount = , cumulative cost = {rows, cpu, i
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable2), fields:(d, e, f))
+			content : SourceConversion(table=[default_catalog.default_database.MyTable2], fields=[d, e, f])
 			ship_strategy : FORWARD
 
 ]]>
@@ -668,11 +668,11 @@ Union(all=[true], union=[a, b, c])
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable2), fields:(d, e, f))
+			content : SourceConversion(table=[default_catalog.default_database.MyTable2], fields=[d, e, f])
 			ship_strategy : FORWARD
 
 ]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
index 16d3278..f50b17e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
@@ -30,7 +30,7 @@ DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a,
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 ]]>
@@ -56,19 +56,19 @@ Calc(select=[EXPR$0], updateAsRetraction=[false], accMode=[Acc]): rowcount = , c
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : Calc(select: (a))
+			content : Calc(select=[a])
 			ship_strategy : FORWARD
 
 			 : Operator
-				content : GroupAggregate
+				content : GroupAggregate(groupBy=[a], select=[a, COUNT(*) AS EXPR$0])
 				ship_strategy : HASH
 
 				 : Operator
-					content : Calc(select: (EXPR$0))
+					content : Calc(select=[EXPR$0])
 					ship_strategy : FORWARD
 
 ]]>
@@ -88,7 +88,7 @@ DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[a,
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 ]]>
@@ -108,7 +108,7 @@ TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [Tes
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [TestTableSource(a, b, c)]], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 ]]>
@@ -128,7 +128,7 @@ TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [Tes
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable, source: [TestTableSource(a, b, c)]], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 ]]>
@@ -154,19 +154,19 @@ Calc(select=[EXPR$0])
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : Calc(select: (a))
+			content : Calc(select=[a])
 			ship_strategy : FORWARD
 
 			 : Operator
-				content : GroupAggregate
+				content : GroupAggregate(groupBy=[a], select=[a, COUNT(*) AS EXPR$0])
 				ship_strategy : HASH
 
 				 : Operator
-					content : Calc(select: (EXPR$0))
+					content : Calc(select=[EXPR$0])
 					ship_strategy : FORWARD
 
 ]]>
@@ -188,11 +188,11 @@ Calc(select=[a, b, c], where=[=(MOD(a, 2), 0)])
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : Calc(where: ((a MOD 2) = 0), select: (a, b, c))
+			content : Calc(select=[a, b, c], where=[((a MOD 2) = 0)])
 			ship_strategy : FORWARD
 
 ]]>
@@ -214,11 +214,11 @@ Calc(select=[a, b, c], where=[=(MOD(a, 2), 0)], updateAsRetraction=[false], accM
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : Calc(where: ((a MOD 2) = 0), select: (a, b, c))
+			content : Calc(select=[a, b, c], where=[((a MOD 2) = 0)])
 			ship_strategy : FORWARD
 
 ]]>
@@ -249,19 +249,19 @@ Calc(select=[a, b, c, e, f])
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable2), fields:(d, e, f))
+			content : SourceConversion(table=[default_catalog.default_database.MyTable2], fields=[d, e, f])
 			ship_strategy : FORWARD
 
 			 : Operator
-				content : InnerJoin(where: (=(a, d)), select: (a, b, c, d, e, f), leftInputSpec: NoUniqueKey, rightInputSpec: NoUniqueKey)
+				content : Join(joinType=[InnerJoin], where=[(a = d)], select=[a, b, c, d, e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
 				ship_strategy : HASH
 
 				 : Operator
-					content : Calc(select: (a, b, c, e, f))
+					content : Calc(select=[a, b, c, e, f])
 					ship_strategy : FORWARD
 
 ]]>
@@ -289,11 +289,11 @@ Union(all=[true], union=[a, b, c], updateAsRetraction=[false], accMode=[Acc]): r
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable2), fields:(d, e, f))
+			content : SourceConversion(table=[default_catalog.default_database.MyTable2], fields=[d, e, f])
 			ship_strategy : FORWARD
 
 ]]>
@@ -324,19 +324,19 @@ Calc(select=[a, b, c, e, f], updateAsRetraction=[false], accMode=[Acc]): rowcoun
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable2), fields:(d, e, f))
+			content : SourceConversion(table=[default_catalog.default_database.MyTable2], fields=[d, e, f])
 			ship_strategy : FORWARD
 
 			 : Operator
-				content : InnerJoin(where: (=(a, d)), select: (a, b, c, d, e, f), leftInputSpec: NoUniqueKey, rightInputSpec: NoUniqueKey)
+				content : Join(joinType=[InnerJoin], where=[(a = d)], select=[a, b, c, d, e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
 				ship_strategy : HASH
 
 				 : Operator
-					content : Calc(select: (a, b, c, e, f))
+					content : Calc(select=[a, b, c, e, f])
 					ship_strategy : FORWARD
 
 ]]>
@@ -378,19 +378,19 @@ Sink(name=[upsertSink2], fields=[a, cnt])
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : Calc(select: (a))
+			content : Calc(select=[a])
 			ship_strategy : FORWARD
 
 			 : Operator
-				content : GroupAggregate
+				content : GroupAggregate(groupBy=[a], select=[a, COUNT(*) AS cnt])
 				ship_strategy : HASH
 
 				 : Operator
-					content : Calc(where: (cnt > 10), select: (a, cnt))
+					content : Calc(select=[a, cnt], where=[(cnt > 10)])
 					ship_strategy : FORWARD
 
 					 : Operator
@@ -402,7 +402,7 @@ Sink(name=[upsertSink2], fields=[a, cnt])
 							ship_strategy : FORWARD
 
 							 : Operator
-								content : Calc(where: (cnt < 10), select: (a, cnt))
+								content : Calc(select=[a, cnt], where=[(cnt < 10)])
 								ship_strategy : FORWARD
 
 								 : Operator
@@ -460,19 +460,19 @@ Sink(name=[upsertSink2], fields=[a, cnt], updateAsRetraction=[false], accMode=[A
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : Calc(select: (a))
+			content : Calc(select=[a])
 			ship_strategy : FORWARD
 
 			 : Operator
-				content : GroupAggregate
+				content : GroupAggregate(groupBy=[a], select=[a, COUNT(*) AS cnt])
 				ship_strategy : HASH
 
 				 : Operator
-					content : Calc(where: (cnt > 10), select: (a, cnt))
+					content : Calc(select=[a, cnt], where=[(cnt > 10)])
 					ship_strategy : FORWARD
 
 					 : Operator
@@ -484,7 +484,7 @@ Sink(name=[upsertSink2], fields=[a, cnt], updateAsRetraction=[false], accMode=[A
 							ship_strategy : FORWARD
 
 							 : Operator
-								content : Calc(where: (cnt < 10), select: (a, cnt))
+								content : Calc(select=[a, cnt], where=[(cnt < 10)])
 								ship_strategy : FORWARD
 
 								 : Operator
@@ -524,11 +524,11 @@ Sink(name=[appendSink], fields=[a, b, c])
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : Calc(where: (a > 10), select: (a, b, c))
+			content : Calc(select=[a, b, c], where=[(a > 10)])
 			ship_strategy : FORWARD
 
 			 : Operator
@@ -560,11 +560,11 @@ Sink(name=[appendSink], fields=[a, b, c], updateAsRetraction=[false], accMode=[A
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : Calc(where: (a > 10), select: (a, b, c))
+			content : Calc(select=[a, b, c], where=[(a > 10)])
 			ship_strategy : FORWARD
 
 			 : Operator
@@ -595,11 +595,11 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5])
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : SortLimit(orderBy: [a ASC], offset: 0, fetch: 5)
+			content : SortLimit(orderBy=[a ASC], offset=[0], fetch=[5])
 			ship_strategy : GLOBAL
 
 ]]>
@@ -622,11 +622,11 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], updateAsRetraction=[false], ac
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : SortLimit(orderBy: [a ASC], offset: 0, fetch: 5)
+			content : SortLimit(orderBy=[a ASC], offset=[0], fetch=[5])
 			ship_strategy : GLOBAL
 
 ]]>
@@ -689,35 +689,35 @@ Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], accMode=[Acc
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, T1), fields:(id1, text, rowtime))
+		content : SourceConversion(table=[default_catalog.default_database.T1], fields=[id1, text, rowtime])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : WatermarkAssigner(rowtime: 2, offset: 0)
+			content : WatermarkAssigner(fields=[id1, text, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
 			ship_strategy : FORWARD
 
 			 : Operator
-				content : SourceConversion(table:Buffer(default_catalog, default_database, T2), fields:(id2, cnt, name, goods, rowtime))
+				content : SourceConversion(table=[default_catalog.default_database.T2], fields=[id2, cnt, name, goods, rowtime])
 				ship_strategy : FORWARD
 
 				 : Operator
-					content : WatermarkAssigner(rowtime: 4, offset: 0)
+					content : WatermarkAssigner(fields=[id2, cnt, name, goods, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
 					ship_strategy : FORWARD
 
 					 : Operator
-						content : Co-Process
+						content : WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=2, rightTimeIndex=4], where=[((id1 = id2) AND (CAST(rowtime) > (CAST(rowtime0) - 300000:INTERVAL MINUTE)) AND (CAST(rowtime) < (CAST(rowtime0) + 180000:INTERVAL MINUTE)))], select=[id1, text, rowtime, id2, cnt, name, goods, rowtime0])
 						ship_strategy : HASH
 
 						 : Operator
-							content : Calc(select: (id1, rowtime AS ts, text))
+							content : Calc(select=[id1, rowtime AS ts, text])
 							ship_strategy : FORWARD
 
 							 : Operator
-								content : Calc(select: (id1, ts, text, _UTF-16LE'#' AS $f3))
+								content : Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3])
 								ship_strategy : FORWARD
 
 								 : Operator
-									content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
+									content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
 									ship_strategy : HASH
 
 									 : Operator
@@ -725,11 +725,11 @@ Sink(name=[appendSink2], fields=[a, b], updateAsRetraction=[false], accMode=[Acc
 										ship_strategy : FORWARD
 
 										 : Operator
-											content : Calc(select: (id1, ts, text, _UTF-16LE'*' AS $f3))
+											content : Calc(select=[id1, ts, text, _UTF-16LE'*' AS $f3])
 											ship_strategy : FORWARD
 
 											 : Operator
-												content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
+												content : GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
 												ship_strategy : HASH
 
 												 : Operator
@@ -769,11 +769,11 @@ Union(all=[true], union=[a, b, c])
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable1), fields:(a, b, c))
+		content : SourceConversion(table=[default_catalog.default_database.MyTable1], fields=[a, b, c])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : SourceConversion(table:Buffer(default_catalog, default_database, MyTable2), fields:(d, e, f))
+			content : SourceConversion(table=[default_catalog.default_database.MyTable2], fields=[d, e, f])
 			ship_strategy : FORWARD
 
 ]]>
@@ -836,35 +836,35 @@ Sink(name=[appendSink2], fields=[a, b])
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, T1), fields:(id1, text, rowtime))
+		content : SourceConversion(table=[default_catalog.default_database.T1], fields=[id1, text, rowtime])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : WatermarkAssigner(rowtime: 2, offset: 0)
+			content : WatermarkAssigner(fields=[id1, text, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
 			ship_strategy : FORWARD
 
 			 : Operator
-				content : SourceConversion(table:Buffer(default_catalog, default_database, T2), fields:(id2, cnt, name, goods, rowtime))
+				content : SourceConversion(table=[default_catalog.default_database.T2], fields=[id2, cnt, name, goods, rowtime])
 				ship_strategy : FORWARD
 
 				 : Operator
-					content : WatermarkAssigner(rowtime: 4, offset: 0)
+					content : WatermarkAssigner(fields=[id2, cnt, name, goods, rowtime], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
 					ship_strategy : FORWARD
 
 					 : Operator
-						content : Co-Process
+						content : WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=2, rightTimeIndex=4], where=[((id1 = id2) AND (CAST(rowtime) > (CAST(rowtime0) - 300000:INTERVAL MINUTE)) AND (CAST(rowtime) < (CAST(rowtime0) + 180000:INTERVAL MINUTE)))], select=[id1, text, rowtime, id2, cnt, name, goods, rowtime0])
 						ship_strategy : HASH
 
 						 : Operator
-							content : Calc(select: (id1, rowtime AS ts, text))
+							content : Calc(select=[id1, rowtime AS ts, text])
 							ship_strategy : FORWARD
 
 							 : Operator
-								content : Calc(select: (id1, ts, text, _UTF-16LE'#' AS $f3))
+								content : Calc(select=[id1, ts, text, _UTF-16LE'#' AS $f3])
 								ship_strategy : FORWARD
 
 								 : Operator
-									content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
+									content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
 									ship_strategy : HASH
 
 									 : Operator
@@ -872,11 +872,11 @@ Sink(name=[appendSink2], fields=[a, b])
 										ship_strategy : FORWARD
 
 										 : Operator
-											content : Calc(select: (id1, ts, text, _UTF-16LE'*' AS $f3))
+											content : Calc(select=[id1, ts, text, _UTF-16LE'*' AS $f3])
 											ship_strategy : FORWARD
 
 											 : Operator
-												content : window: (SlidingGroupWindow('w$, ts, 6000, 12000)), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
+												content : GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 6000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
 												ship_strategy : HASH
 
 												 : Operator
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
index 217eb87..d2f4740 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/LimitTest.xml
@@ -29,7 +29,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]], values=[a, c])
+Values(type=[RecordType(INTEGER a, VARCHAR(2147483647) c)], tuples=[[]], values=[a, c])
 ]]>
     </Resource>
   </TestCase>
@@ -88,7 +88,7 @@ LogicalSort(offset=[10], fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]], values=[a, c])
+Values(type=[RecordType(INTEGER a, VARCHAR(2147483647) c)], tuples=[[]], values=[a, c])
 ]]>
     </Resource>
   </TestCase>
@@ -126,7 +126,7 @@ LogicalSort(offset=[0], fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]], values=[a, c])
+Values(type=[RecordType(INTEGER a, VARCHAR(2147483647) c)], tuples=[[]], values=[a, c])
 ]]>
     </Resource>
   </TestCase>
@@ -143,7 +143,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]], values=[a, b, c])
+Values(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]], values=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml
index 92aa3c5..15f2dc0 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml
@@ -87,7 +87,7 @@ LogicalIntersect(all=[false])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]], values=[c])
+Values(type=[RecordType(VARCHAR(2147483647) c)], tuples=[[]], values=[c])
 ]]>
     </Resource>
   </TestCase>
@@ -107,7 +107,7 @@ LogicalIntersect(all=[false])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]], values=[c])
+Values(type=[RecordType(VARCHAR(2147483647) c)], tuples=[[]], values=[c])
 ]]>
     </Resource>
   </TestCase>
@@ -182,7 +182,7 @@ LogicalMinus(all=[false])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]], values=[c])
+Values(type=[RecordType(VARCHAR(2147483647) c)], tuples=[[]], values=[c])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SortLimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SortLimitTest.xml
index 74112af..fb45f0c 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SortLimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SortLimitTest.xml
@@ -49,7 +49,7 @@ LogicalSort(sort0=[$0], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-firs
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]], values=[a, b, c])
+Values(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]], values=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -85,7 +85,7 @@ LogicalSort(sort0=[$0], dir0=[DESC-nulls-last], fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]], values=[a, b, c])
+Values(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]], values=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -141,7 +141,7 @@ LogicalSort(sort0=[$0], dir0=[DESC-nulls-last], fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]], values=[a, b, c])
+Values(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]], values=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -198,7 +198,7 @@ LogicalSort(sort0=[$0], sort1=[$1], dir0=[DESC-nulls-last], dir1=[ASC-nulls-firs
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]], values=[a, b, c])
+Values(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]], values=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
index a7d748c..a263d16 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/UnnestTest.xml
@@ -28,7 +28,7 @@ LogicalProject(a=[$0], s=[$3])
    +- LogicalProject(s=[$0])
       +- Uncollect
          +- LogicalProject(c=[$cor0.c])
-            +- LogicalValues(tuples=[[{ 0 }]])
+            +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -61,7 +61,7 @@ LogicalProject(b=[$0], s=[$2])
       +- LogicalProject(s=[$0])
          +- Uncollect
             +- LogicalProject(set=[$cor0.set])
-               +- LogicalValues(tuples=[[{ 0 }]])
+               +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -90,7 +90,7 @@ LogicalProject(a=[$0], b=[$1], v=[$4])
    +- LogicalProject(k=[$0], v=[$1])
       +- Uncollect
          +- LogicalProject(c=[$cor0.c])
-            +- LogicalValues(tuples=[[{ 0 }]])
+            +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -121,7 +121,7 @@ LogicalProject(a=[$0], b=[$1], x=[$2], y=[$3])
       +- LogicalProject(x=[$0], y=[$1])
          +- Uncollect
             +- LogicalProject(b=[$cor0.b])
-               +- LogicalValues(tuples=[[{ 0 }]])
+               +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -151,7 +151,7 @@ LogicalProject(a=[$0], s=[$2])
       +- LogicalProject(s=[$0])
          +- Uncollect
             +- LogicalProject(set=[$cor0.set])
-               +- LogicalValues(tuples=[[{ 0 }]])
+               +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -180,7 +180,7 @@ LogicalProject(a=[$0], s=[$3])
    +- LogicalProject(s=[$0])
       +- Uncollect
          +- LogicalProject(c=[$cor0.c])
-            +- LogicalValues(tuples=[[{ 0 }]])
+            +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -209,7 +209,7 @@ LogicalProject(b=[$0], id=[$2], point=[$3])
       +- LogicalProject(id=[$0], point=[$1])
          +- Uncollect
             +- LogicalProject(set=[$cor0.set])
-               +- LogicalValues(tuples=[[{ 0 }]])
+               +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -238,7 +238,7 @@ LogicalProject(a=[$0], b=[$1], s=[$3])
    +- LogicalProject(s=[$0])
       +- Uncollect
          +- LogicalProject(b=[$cor0.b])
-            +- LogicalValues(tuples=[[{ 0 }]])
+            +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -262,7 +262,7 @@ LogicalProject(a=[$0], b=[$1], s=[$2], t=[$3])
       +- LogicalProject(s=[$0], t=[$1])
          +- Uncollect
             +- LogicalProject(b=[$cor0.b])
-               +- LogicalValues(tuples=[[{ 0 }]])
+               +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -284,7 +284,7 @@ LogicalProject(a=[$0], b=[$1], _1=[$2], _2=[$3])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b)]]])
       +- Uncollect
          +- LogicalProject(b=[$cor0.b])
-            +- LogicalValues(tuples=[[{ 0 }]])
+            +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ValuesTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ValuesTest.xml
index c517550..fa537d3 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ValuesTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ValuesTest.xml
@@ -25,16 +25,16 @@ limitations under the License.
 LogicalProject(a=[$0], b=[$1]), rowType=[RecordType(INTEGER a, DECIMAL(20, 1) b)]
 +- LogicalUnion(all=[true]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
    :- LogicalProject(EXPR$0=[1], EXPR$1=[2.0:DECIMAL(2, 1)]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(2, 1) EXPR$1)]
-   :  +- LogicalValues(tuples=[[{ 0 }]]), rowType=[RecordType(INTEGER ZERO)]
+   :  +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]]), rowType=[RecordType(INTEGER ZERO)]
    +- LogicalProject(EXPR$0=[3], EXPR$1=[4:BIGINT]), rowType=[RecordType(INTEGER EXPR$0, BIGINT EXPR$1)]
-      +- LogicalValues(tuples=[[{ 0 }]]), rowType=[RecordType(INTEGER ZERO)]
+      +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]]), rowType=[RecordType(INTEGER ZERO)]
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Union(all=[true], union=[EXPR$0, EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
 :- Calc(select=[1 AS EXPR$0, 2.0:DECIMAL(20, 1) AS EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
-:  +- Values(tuples=[[{ 0 }]], values=[ZERO], reuse_id=[1]), rowType=[RecordType(INTEGER ZERO)]
+:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], values=[ZERO], reuse_id=[1]), rowType=[RecordType(INTEGER ZERO)]
 +- Calc(select=[3 AS EXPR$0, 4.0:DECIMAL(20, 1) AS EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
    +- Reused(reference_id=[1]), rowType=[RecordType(INTEGER ZERO)]
 ]]>
@@ -47,12 +47,12 @@ Union(all=[true], union=[EXPR$0, EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, D
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2])
-+- LogicalValues(tuples=[[{ 1, 2, 3 }]])
++- LogicalValues(type=[RecordType(INTEGER a, INTEGER b, INTEGER c)], tuples=[[{ 1, 2, 3 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[{ 1, 2, 3 }]], values=[a, b, c])
+Values(type=[RecordType(INTEGER a, INTEGER b, INTEGER c)], tuples=[[{ 1, 2, 3 }]], values=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -65,18 +65,18 @@ Values(tuples=[[{ 1, 2, 3 }]], values=[a, b, c])
 LogicalProject(a=[$0], b=[$1])
 +- LogicalUnion(all=[true])
    :- LogicalProject(EXPR$0=[1], EXPR$1=[2])
-   :  +- LogicalValues(tuples=[[{ 0 }]])
+   :  +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
    :- LogicalProject(EXPR$0=[3], EXPR$1=[null:INTEGER])
-   :  +- LogicalValues(tuples=[[{ 0 }]])
+   :  +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
    +- LogicalProject(EXPR$0=[4], EXPR$1=[5])
-      +- LogicalValues(tuples=[[{ 0 }]])
+      +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Union(all=[true], union=[EXPR$0, EXPR$1])
 :- Calc(select=[1 AS EXPR$0, CAST(2) AS EXPR$1])
-:  +- Values(tuples=[[{ 0 }]], values=[ZERO], reuse_id=[1])
+:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], values=[ZERO], reuse_id=[1])
 :- Calc(select=[3 AS EXPR$0, null:INTEGER AS EXPR$1])
 :  +- Reused(reference_id=[1])
 +- Calc(select=[4 AS EXPR$0, CAST(5) AS EXPR$1])
@@ -91,13 +91,13 @@ Union(all=[true], union=[EXPR$0, EXPR$1])
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(EXPR$0=[null:INTEGER])
-+- LogicalValues(tuples=[[{ 0 }]])
++- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[null:INTEGER AS EXPR$0])
-+- Values(tuples=[[{ 0 }]], values=[ZERO])
++- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], values=[ZERO])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml
index 43d4ef2..5d81924 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashJoinTest.xml
@@ -358,7 +358,7 @@ HashJoin(joinType=[LeftOuterJoin], where=[=(k, k0)], select=[k, v, k0, v0], isBr
 :- Calc(select=[CAST(0:BIGINT) AS k, v], where=[=(k, 0:BIGINT)])
 :  +- TableSourceScan(table=[[default_catalog, default_database, src, source: [TestTableSource(k, v)]]], fields=[k, v])
 +- Exchange(distribution=[broadcast])
-   +- Values(tuples=[[]], values=[k, v])
+   +- Values(type=[RecordType(BIGINT k, VARCHAR(2147483647) v)], tuples=[[]], values=[k, v])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.xml
index a4ced57..f93aa46 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopJoinTest.xml
@@ -436,7 +436,7 @@ LogicalProject(d=[$0], e=[$1], f=[$2], g=[$3], h=[$4], a=[$5], b=[$6], c=[$7])
 NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], select=[d, e, f, g, h, a, b, c], build=[right])
 :- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(d, e, f, g, h)]]], fields=[d, e, f, g, h])
 +- Exchange(distribution=[broadcast])
-   +- Values(tuples=[[]], values=[a, b, c])
+   +- Values(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]], values=[a, b, c])
 ]]>
     </Resource>
   </TestCase>
@@ -551,7 +551,7 @@ LogicalProject(d=[$0], e=[$1], f=[$2], g=[$3], h=[$4], a=[$5], b=[$6], c=[$7])
       <![CDATA[
 NestedLoopJoin(joinType=[RightOuterJoin], where=[true], select=[d, e, f, g, h, a, b, c], build=[left])
 :- Exchange(distribution=[broadcast])
-:  +- Values(tuples=[[]], values=[d, e, f, g, h])
+:  +- Values(type=[RecordType(INTEGER d, BIGINT e, INTEGER f, VARCHAR(2147483647) g, BIGINT h)], tuples=[[]], values=[d, e, f, g, h])
 +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -697,7 +697,7 @@ NestedLoopJoin(joinType=[LeftOuterJoin], where=[=(k, k0)], select=[k, v, k0, v0]
 :- Calc(select=[CAST(0:BIGINT) AS k, v], where=[=(k, 0:BIGINT)])
 :  +- TableSourceScan(table=[[default_catalog, default_database, src, source: [TestTableSource(k, v)]]], fields=[k, v])
 +- Exchange(distribution=[broadcast])
-   +- Values(tuples=[[]], values=[k, v])
+   +- Values(type=[RecordType(BIGINT k, VARCHAR(2147483647) v)], tuples=[[]], values=[k, v])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml
index f19e85a..a1c52e2 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeJoinTest.xml
@@ -475,7 +475,7 @@ SortMergeJoin(joinType=[LeftOuterJoin], where=[=(k, k0)], select=[k, v, k0, v0])
 :  +- Calc(select=[CAST(0:BIGINT) AS k, v], where=[=(k, 0:BIGINT)])
 :     +- TableSourceScan(table=[[default_catalog, default_database, src, source: [TestTableSource(k, v)]]], fields=[k, v])
 +- Exchange(distribution=[hash[k]])
-   +- Values(tuples=[[]], values=[k, v])
+   +- Values(type=[RecordType(BIGINT k, VARCHAR(2147483647) v)], tuples=[[]], values=[k, v])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPruneAggregateCallRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPruneAggregateCallRuleTest.xml
index 86fa1c9..2ca68f6 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPruneAggregateCallRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/CalcPruneAggregateCallRuleTest.xml
@@ -89,7 +89,7 @@ LogicalJoin(condition=[$4], joinType=[semi])
    +- LogicalAggregate(group=[{}], m=[MIN($0)])
       +- LogicalCalc(expr#0=[{inputs}], expr#1=[true], i=[$t1])
          +- LogicalAggregate(group=[{}], EXPR$0=[SUM($0)])
-            +- LogicalValues(tuples=[[]])
+            +- LogicalValues(type=[RecordType(INTEGER a1)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -166,7 +166,7 @@ LogicalJoin(condition=[$4], joinType=[semi])
    +- LogicalAggregate(group=[{}], m=[MIN($0)])
       +- LogicalCalc(expr#0=[{inputs}], expr#1=[true], i=[$t1])
          +- LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
-            +- LogicalValues(tuples=[[]])
+            +- LogicalValues(type=[RecordType(INTEGER $f0)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -188,7 +188,7 @@ LogicalProject(a1=[$0], c1=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-LogicalCalc(expr#0..1=[{inputs}], expr#2=[10], expr#3=[>($t1, $t2)], proj#0..1=[{exprs}], $condition=[$t3])
+LogicalCalc(expr#0..1=[{inputs}], expr#2=[10], expr#3=[>($t1, $t2)], a1=[$t0], c1=[$t1], $condition=[$t3])
 +- LogicalAggregate(group=[{0}], c1=[COUNT($2)])
    +- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]])
 ]]>
@@ -279,7 +279,7 @@ LogicalProject(a2=[$0], b2=[$1], d2=[$3])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-LogicalCalc(expr#0..2=[{inputs}], expr#3=[0], expr#4=[>($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])
+LogicalCalc(expr#0..2=[{inputs}], expr#3=[0], expr#4=[>($t2, $t3)], a2=[$t0], b2=[$t1], d2=[$t2], $condition=[$t4])
 +- LogicalAggregate(group=[{0, 1}], d2=[SUM($3)])
    +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2, d2)]]])
 ]]>
@@ -368,7 +368,7 @@ LogicalProject(a=[$0], b2=[$1], d2=[$3])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-LogicalCalc(expr#0..2=[{inputs}], proj#0..2=[{exprs}])
+LogicalCalc(expr#0..2=[{inputs}], a=[$t0], b2=[$t1], d2=[$t2])
 +- LogicalAggregate(group=[{0, 1}], d2=[SUM($3)])
    +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2, d2)]]])
 ]]>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateRemoveRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateRemoveRuleTest.xml
index c92e2a3..624bc19 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateRemoveRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkAggregateRemoveRuleTest.xml
@@ -465,13 +465,13 @@ FlinkLogicalAggregate(group=[{}], EXPR$0=[MAX($0)], EXPR$1=[SUM($1)], EXPR$2=[MI
     <Resource name="planBefore">
       <![CDATA[
 LogicalAggregate(group=[{}], EXPR$0=[MAX($0)], EXPR$1=[SUM($1)], EXPR$2=[MIN($2)])
-+- LogicalValues(tuples=[[{ 1, 2, 3 }]])
++- LogicalValues(type=[RecordType(INTEGER a, INTEGER b, INTEGER c)], tuples=[[{ 1, 2, 3 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 FlinkLogicalAggregate(group=[{}], EXPR$0=[MAX($0)], EXPR$1=[SUM($1)], EXPR$2=[MIN($2)])
-+- FlinkLogicalValues(tuples=[[{ 1, 2, 3 }]])
++- FlinkLogicalValues(type=[RecordType(INTEGER a, INTEGER b, INTEGER c)], tuples=[[{ 1, 2, 3 }]])
 ]]>
     </Resource>
   </TestCase>
@@ -501,7 +501,7 @@ FlinkLogicalCalc(select=[a, b, c])
          +- FlinkLogicalCalc(select=[true AS i])
             +- FlinkLogicalAggregate(group=[{}], EXPR$0=[SUM($0)])
                +- FlinkLogicalCalc(select=[a])
-                  +- FlinkLogicalValues(tuples=[[]])
+                  +- FlinkLogicalValues(type=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -521,7 +521,7 @@ LogicalAggregate(group=[{}], EXPR$0=[SUM($0)])
       <![CDATA[
 FlinkLogicalAggregate(group=[{}], EXPR$0=[SUM($0)])
 +- FlinkLogicalCalc(select=[a])
-   +- FlinkLogicalValues(tuples=[[]])
+   +- FlinkLogicalValues(type=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLimit0RemoveRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLimit0RemoveRuleTest.xml
index 649de95..1102b0a 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLimit0RemoveRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkLimit0RemoveRuleTest.xml
@@ -39,7 +39,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
    +- LogicalProject($f0=[IS NOT NULL($0)])
       +- LogicalAggregate(group=[{}], m=[MIN($0)])
          +- LogicalProject(i=[true])
-            +- LogicalValues(tuples=[[]])
+            +- LogicalValues(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -63,7 +63,7 @@ LogicalSort(fetch=[0])
 LogicalProject(a=[$0], b=[$1], c=[$2])
 +- LogicalJoin(condition=[OR(=($0, $3), IS NULL($0), IS NULL($3))], joinType=[anti])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalValues(tuples=[[]])
+   +- LogicalValues(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -87,7 +87,7 @@ LogicalSort(fetch=[0])
 LogicalProject(a=[$0], b=[$1], c=[$2])
 +- LogicalJoin(condition=[=($0, $3)], joinType=[semi])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalValues(tuples=[[]])
+   +- LogicalValues(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -110,7 +110,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5])
 LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5])
 +- LogicalJoin(condition=[true], joinType=[inner])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalValues(tuples=[[]])
+   +- LogicalValues(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -137,7 +137,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
    +- LogicalProject($f0=[IS NOT NULL($0)])
       +- LogicalAggregate(group=[{}], m=[MIN($0)])
          +- LogicalProject(i=[true])
-            +- LogicalValues(tuples=[[]])
+            +- LogicalValues(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -154,7 +154,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-LogicalValues(tuples=[[]])
+LogicalValues(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -171,7 +171,7 @@ LogicalSort(sort0=[$0], dir0=[ASC-nulls-first], offset=[10], fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-LogicalValues(tuples=[[]])
+LogicalValues(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -190,7 +190,7 @@ LogicalProject(a=[$0])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(a=[$0])
-+- LogicalValues(tuples=[[]])
++- LogicalValues(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -207,7 +207,7 @@ LogicalSort(sort0=[$0], dir0=[ASC-nulls-first], fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-LogicalValues(tuples=[[]])
+LogicalValues(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkPruneEmptyRulesTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkPruneEmptyRulesTest.xml
index d32afbb..cc063dd 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkPruneEmptyRulesTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/FlinkPruneEmptyRulesTest.xml
@@ -34,7 +34,7 @@ LogicalProject(d=[$0])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2])
-+- LogicalValues(tuples=[[]])
++- LogicalValues(type=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
index c42adc9..adb53b9 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/LogicalUnnestRuleTest.xml
@@ -28,7 +28,7 @@ LogicalProject(a=[$0], s=[$3])
    +- LogicalProject(s=[$0])
       +- Uncollect
          +- LogicalProject(c=[$cor0.c])
-            +- LogicalValues(tuples=[[{ 0 }]])
+            +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -63,7 +63,7 @@ LogicalProject(b=[$0], s=[$2])
       +- LogicalProject(s=[$0])
          +- Uncollect
             +- LogicalProject(set=[$cor0.set])
-               +- LogicalValues(tuples=[[{ 0 }]])
+               +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -92,7 +92,7 @@ LogicalProject(a=[$0], b=[$1], v=[$4])
    +- LogicalProject(k=[$0], v=[$1])
       +- Uncollect
          +- LogicalProject(c=[$cor0.c])
-            +- LogicalValues(tuples=[[{ 0 }]])
+            +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -125,7 +125,7 @@ LogicalProject(a=[$0], b=[$1], x=[$2], y=[$3])
       +- LogicalProject(x=[$0], y=[$1])
          +- Uncollect
             +- LogicalProject(b=[$cor0.b])
-               +- LogicalValues(tuples=[[{ 0 }]])
+               +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -159,7 +159,7 @@ LogicalProject(a=[$0], s=[$2])
       +- LogicalProject(s=[$0])
          +- Uncollect
             +- LogicalProject(set=[$cor0.set])
-               +- LogicalValues(tuples=[[{ 0 }]])
+               +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -187,7 +187,7 @@ LogicalProject(a=[$0], s=[$3])
    +- LogicalProject(s=[$0])
       +- Uncollect
          +- LogicalProject(c=[$cor0.c])
-            +- LogicalValues(tuples=[[{ 0 }]])
+            +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -218,7 +218,7 @@ LogicalProject(b=[$0], id=[$2], point=[$3])
       +- LogicalProject(id=[$0], point=[$1])
          +- Uncollect
             +- LogicalProject(set=[$cor0.set])
-               +- LogicalValues(tuples=[[{ 0 }]])
+               +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -246,7 +246,7 @@ LogicalProject(a=[$0], b=[$1], s=[$3])
    +- LogicalProject(s=[$0])
       +- Uncollect
          +- LogicalProject(b=[$cor0.b])
-            +- LogicalValues(tuples=[[{ 0 }]])
+            +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -272,7 +272,7 @@ LogicalProject(a=[$0], b=[$1], s=[$2], t=[$3])
       +- LogicalProject(s=[$0], t=[$1])
          +- Uncollect
             +- LogicalProject(b=[$cor0.b])
-               +- LogicalValues(tuples=[[{ 0 }]])
+               +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -298,7 +298,7 @@ LogicalProject(a=[$0], b=[$1], _1=[$2], _2=[$3])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b)]]])
       +- Uncollect
          +- LogicalProject(b=[$cor0.b])
-            +- LogicalValues(tuples=[[{ 0 }]])
+            +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectPruneAggregateCallRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectPruneAggregateCallRuleTest.xml
index 3df36eb..3ded214 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectPruneAggregateCallRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/ProjectPruneAggregateCallRuleTest.xml
@@ -89,7 +89,7 @@ LogicalJoin(condition=[$4], joinType=[semi])
    +- LogicalAggregate(group=[{}], m=[MIN($0)])
       +- LogicalProject(i=[true])
          +- LogicalAggregate(group=[{}], EXPR$0=[SUM($0)])
-            +- LogicalValues(tuples=[[]])
+            +- LogicalValues(type=[RecordType(INTEGER a1)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -166,7 +166,7 @@ LogicalJoin(condition=[$4], joinType=[semi])
    +- LogicalAggregate(group=[{}], m=[MIN($0)])
       +- LogicalProject(i=[true])
          +- LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
-            +- LogicalValues(tuples=[[]])
+            +- LogicalValues(type=[RecordType(INTEGER $f0)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
index cfae4e1..f0531e6 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/subquery/SubQuerySemiJoinTest.xml
@@ -588,7 +588,7 @@ LogicalFilter(condition=[=($0, $cor0.a)])
   LogicalTableScan(table=[[default_catalog, default_database, y, source: [TestTableSource(c, d)]]])
 })], variablesSet=[[$cor0]])
    +- LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
-      +- LogicalValues(tuples=[[{ 1, 2 }]])
+      +- LogicalValues(type=[RecordType(INTEGER EXPR$0, INTEGER EXPR$1)], tuples=[[{ 1, 2 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -596,7 +596,7 @@ LogicalFilter(condition=[=($0, $cor0.a)])
 LogicalProject(a=[$0], b=[$1])
 +- LogicalJoin(condition=[=($2, $0)], joinType=[semi])
    :- LogicalProject(EXPR$0=[$0], EXPR$1=[$1])
-   :  +- LogicalValues(tuples=[[{ 1, 2 }]])
+   :  +- LogicalValues(type=[RecordType(INTEGER EXPR$0, INTEGER EXPR$1)], tuples=[[{ 1, 2 }]])
    +- LogicalProject(c=[$0])
       +- LogicalFilter(condition=[true])
          +- LogicalTableScan(table=[[default_catalog, default_database, y, source: [TestTableSource(c, d)]]])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/LimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/LimitTest.xml
index 607f9f4..d9904a3 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/LimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/LimitTest.xml
@@ -29,7 +29,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a, BIGINT c)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -86,7 +86,7 @@ LogicalSort(offset=[10], fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a, BIGINT c)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -103,7 +103,7 @@ LogicalSort(offset=[0], fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a, BIGINT c)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -120,7 +120,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIMESTAMP(3) proctime, TIME ATTRIBUTE(ROWTIME) rowtime)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -137,7 +137,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(TIMESTAMP(3) proctime, BIGINT c)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -154,7 +154,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(TIMESTAMP(3) desc, BIGINT c)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -171,7 +171,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(BIGINT c, TIMESTAMP(3) desc)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -188,7 +188,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(BIGINT c, TIMESTAMP(3) proctime)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -205,7 +205,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(TIME ATTRIBUTE(ROWTIME) rowtime, BIGINT c)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -222,7 +222,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(TIME ATTRIBUTE(ROWTIME) desc, BIGINT c)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -239,7 +239,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(BIGINT c, TIME ATTRIBUTE(ROWTIME) desc)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -256,7 +256,7 @@ LogicalSort(fetch=[0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
index 9a1d03b..1dcbe67 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
@@ -301,43 +301,43 @@ Sink(name=[appendSink3], fields=[a, b])
 	content : collect elements with CollectionInputFormat
 
 	 : Operator
-		content : SourceConversion(table:Buffer(default_catalog, default_database, T1), fields:(id1, rowtime, text))
+		content : SourceConversion(table=[default_catalog.default_database.T1], fields=[id1, rowtime, text])
 		ship_strategy : FORWARD
 
 		 : Operator
-			content : WatermarkAssigner(rowtime: 1, offset: 0)
+			content : WatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
 			ship_strategy : FORWARD
 
 			 : Operator
-				content : SourceConversion(table:Buffer(default_catalog, default_database, T2), fields:(id2, rowtime, cnt, name, goods))
+				content : SourceConversion(table=[default_catalog.default_database.T2], fields=[id2, rowtime, cnt, name, goods])
 				ship_strategy : FORWARD
 
 				 : Operator
-					content : WatermarkAssigner(rowtime: 1, offset: 0)
+					content : WatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
 					ship_strategy : FORWARD
 
 					 : Operator
-						content : Co-Process
+						content : WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, rightTimeIndex=1], where=[((id1 = id2) AND (CAST(rowtime) > (CAST(rowtime0) - 300000:INTERVAL MINUTE)) AND (CAST(rowtime) < (CAST(rowtime0) + 180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods])
 						ship_strategy : HASH
 
 						 : Operator
-							content : Calc(select: (id1, rowtime AS ts, text))
+							content : Calc(select=[id1, rowtime AS ts, text])
 							ship_strategy : FORWARD
 
 							 : Operator
-								content : Calc(select: (ts, id1, text, _UTF-16LE'#' AS $f3))
+								content : Calc(select=[ts, id1, text, _UTF-16LE'#' AS $f3])
 								ship_strategy : FORWARD
 
 								 : Operator
-									content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime)
+									content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
 									ship_strategy : HASH
 
 									 : Operator
-										content : Calc(select: (w$rowtime AS ts, id1, text, _UTF-16LE'*' AS $f3))
+										content : Calc(select=[w$rowtime AS ts, id1, text, _UTF-16LE'*' AS $f3])
 										ship_strategy : FORWARD
 
 										 : Operator
-											content : window: (SlidingGroupWindow('w$, ts, 4000, 12000)), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
+											content : GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
 											ship_strategy : HASH
 
 											 : Operator
@@ -345,11 +345,11 @@ Sink(name=[appendSink3], fields=[a, b])
 												ship_strategy : FORWARD
 
 												 : Operator
-													content : Calc(select: (ts, id1, text, _UTF-16LE'-' AS $f3))
+													content : Calc(select=[ts, id1, text, _UTF-16LE'-' AS $f3])
 													ship_strategy : FORWARD
 
 													 : Operator
-														content : window: (TumblingGroupWindow), groupBy: (id1), select: (id1, LISTAGG(text, $f3) AS EXPR$1)
+														content : GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
 														ship_strategy : HASH
 
 														 : Operator
@@ -357,15 +357,15 @@ Sink(name=[appendSink3], fields=[a, b])
 															ship_strategy : FORWARD
 
 															 : Operator
-																content : Calc(select: (id1, text))
+																content : Calc(select=[id1, text])
 																ship_strategy : FORWARD
 
 																 : Operator
-																	content : LocalGroupAggregate
+																	content : LocalGroupAggregate(groupBy=[id1], select=[id1, COUNT(text) AS count$0])
 																	ship_strategy : FORWARD
 
 																	 : Operator
-																		content : GlobalGroupAggregate
+																		content : GlobalGroupAggregate(groupBy=[id1], select=[id1, COUNT(count$0) AS EXPR$1])
 																		ship_strategy : HASH
 
 																		 : Operator
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
index 4317d07..1afa05f 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RankTest.xml
@@ -203,7 +203,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, b, c])
-+- Deduplicate(keepLastRow=[false], key=[a], order=[PROCTIME])
++- Deduplicate(keep=[FirstRow], key=[a], order=[PROCTIME])
    +- Exchange(distribution=[hash[a]])
       +- Calc(select=[a, b, c, proctime])
          +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -232,7 +232,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, b, c])
-+- Deduplicate(keepLastRow=[true], key=[a], order=[PROCTIME])
++- Deduplicate(keep=[LastRow], key=[a], order=[PROCTIME])
    +- Exchange(distribution=[hash[a]])
       +- Calc(select=[a, b, c, proctime])
          +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml
index d4b1f97..b28a386 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml
@@ -87,7 +87,7 @@ LogicalIntersect(all=[false])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(VARCHAR(2147483647) c)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -107,7 +107,7 @@ LogicalIntersect(all=[false])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(VARCHAR(2147483647) c)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -182,7 +182,7 @@ LogicalMinus(all=[false])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(VARCHAR(2147483647) c)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortLimitTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortLimitTest.xml
index 03d9b69..b9ce519 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortLimitTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SortLimitTest.xml
@@ -217,7 +217,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -256,7 +256,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -295,7 +295,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -418,7 +418,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -562,7 +562,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -601,7 +601,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -640,7 +640,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -763,7 +763,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -802,7 +802,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -841,7 +841,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -880,7 +880,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -919,7 +919,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -958,7 +958,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -997,7 +997,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -1036,7 +1036,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
@@ -1180,7 +1180,7 @@ LogicalProject(a=[$0])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
index e811758..b42e44f 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/UnnestTest.xml
@@ -28,7 +28,7 @@ LogicalProject(a=[$0], s=[$3])
    +- LogicalProject(s=[$0])
       +- Uncollect
          +- LogicalProject(c=[$cor0.c])
-            +- LogicalValues(tuples=[[{ 0 }]])
+            +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -61,7 +61,7 @@ LogicalProject(b=[$0], s=[$2])
       +- LogicalProject(s=[$0])
          +- Uncollect
             +- LogicalProject(set=[$cor0.set])
-               +- LogicalValues(tuples=[[{ 0 }]])
+               +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -87,7 +87,7 @@ LogicalProject(a=[$0], b=[$1], v=[$4])
    +- LogicalProject(k=[$0], v=[$1])
       +- Uncollect
          +- LogicalProject(c=[$cor0.c])
-            +- LogicalValues(tuples=[[{ 0 }]])
+            +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -118,7 +118,7 @@ LogicalProject(a=[$0], b=[$1], x=[$2], y=[$3])
       +- LogicalProject(x=[$0], y=[$1])
          +- Uncollect
             +- LogicalProject(b=[$cor0.b])
-               +- LogicalValues(tuples=[[{ 0 }]])
+               +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -148,7 +148,7 @@ LogicalProject(a=[$0], s=[$2])
       +- LogicalProject(s=[$0])
          +- Uncollect
             +- LogicalProject(set=[$cor0.set])
-               +- LogicalValues(tuples=[[{ 0 }]])
+               +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -174,7 +174,7 @@ LogicalProject(a=[$0], s=[$3])
    +- LogicalProject(s=[$0])
       +- Uncollect
          +- LogicalProject(c=[$cor0.c])
-            +- LogicalValues(tuples=[[{ 0 }]])
+            +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -203,7 +203,7 @@ LogicalProject(b=[$0], id=[$2], point=[$3])
       +- LogicalProject(id=[$0], point=[$1])
          +- Uncollect
             +- LogicalProject(set=[$cor0.set])
-               +- LogicalValues(tuples=[[{ 0 }]])
+               +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -229,7 +229,7 @@ LogicalProject(a=[$0], b=[$1], s=[$3])
    +- LogicalProject(s=[$0])
       +- Uncollect
          +- LogicalProject(b=[$cor0.b])
-            +- LogicalValues(tuples=[[{ 0 }]])
+            +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -253,7 +253,7 @@ LogicalProject(a=[$0], b=[$1], s=[$2], t=[$3])
       +- LogicalProject(s=[$0], t=[$1])
          +- Uncollect
             +- LogicalProject(b=[$cor0.b])
-               +- LogicalValues(tuples=[[{ 0 }]])
+               +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
@@ -275,7 +275,7 @@ LogicalProject(a=[$0], b=[$1], _1=[$2], _2=[$3])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b)]]])
       +- Uncollect
          +- LogicalProject(b=[$cor0.b])
-            +- LogicalValues(tuples=[[{ 0 }]])
+            +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ValuesTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ValuesTest.xml
index d2ec0f4..6f7026f 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ValuesTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ValuesTest.xml
@@ -25,16 +25,16 @@ limitations under the License.
 LogicalProject(a=[$0], b=[$1]), rowType=[RecordType(INTEGER a, DECIMAL(20, 1) b)]
 +- LogicalUnion(all=[true]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
    :- LogicalProject(EXPR$0=[1], EXPR$1=[2.0:DECIMAL(2, 1)]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(2, 1) EXPR$1)]
-   :  +- LogicalValues(tuples=[[{ 0 }]]), rowType=[RecordType(INTEGER ZERO)]
+   :  +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]]), rowType=[RecordType(INTEGER ZERO)]
    +- LogicalProject(EXPR$0=[3], EXPR$1=[4:BIGINT]), rowType=[RecordType(INTEGER EXPR$0, BIGINT EXPR$1)]
-      +- LogicalValues(tuples=[[{ 0 }]]), rowType=[RecordType(INTEGER ZERO)]
+      +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]]), rowType=[RecordType(INTEGER ZERO)]
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Union(all=[true], union=[EXPR$0, EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
 :- Calc(select=[1 AS EXPR$0, 2.0:DECIMAL(20, 1) AS EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
-:  +- Values(tuples=[[{ 0 }]], reuse_id=[1]), rowType=[RecordType(INTEGER ZERO)]
+:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], reuse_id=[1]), rowType=[RecordType(INTEGER ZERO)]
 +- Calc(select=[3 AS EXPR$0, 4.0:DECIMAL(20, 1) AS EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, DECIMAL(20, 1) EXPR$1)]
    +- Reused(reference_id=[1]), rowType=[RecordType(INTEGER ZERO)]
 ]]>
@@ -47,12 +47,12 @@ Union(all=[true], union=[EXPR$0, EXPR$1]), rowType=[RecordType(INTEGER EXPR$0, D
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(a=[$0], b=[$1], c=[$2])
-+- LogicalValues(tuples=[[{ 1, 2, 3 }]])
++- LogicalValues(type=[RecordType(INTEGER a, INTEGER b, INTEGER c)], tuples=[[{ 1, 2, 3 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[{ 1, 2, 3 }]])
+Values(type=[RecordType(INTEGER a, INTEGER b, INTEGER c)], tuples=[[{ 1, 2, 3 }]])
 ]]>
     </Resource>
   </TestCase>
@@ -65,18 +65,18 @@ Values(tuples=[[{ 1, 2, 3 }]])
 LogicalProject(a=[$0], b=[$1])
 +- LogicalUnion(all=[true])
    :- LogicalProject(EXPR$0=[1], EXPR$1=[2])
-   :  +- LogicalValues(tuples=[[{ 0 }]])
+   :  +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
    :- LogicalProject(EXPR$0=[3], EXPR$1=[null:INTEGER])
-   :  +- LogicalValues(tuples=[[{ 0 }]])
+   :  +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
    +- LogicalProject(EXPR$0=[4], EXPR$1=[5])
-      +- LogicalValues(tuples=[[{ 0 }]])
+      +- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Union(all=[true], union=[EXPR$0, EXPR$1])
 :- Calc(select=[1 AS EXPR$0, CAST(2) AS EXPR$1])
-:  +- Values(tuples=[[{ 0 }]], reuse_id=[1])
+:  +- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]], reuse_id=[1])
 :- Calc(select=[3 AS EXPR$0, null:INTEGER AS EXPR$1])
 :  +- Reused(reference_id=[1])
 +- Calc(select=[4 AS EXPR$0, CAST(5) AS EXPR$1])
@@ -91,13 +91,13 @@ Union(all=[true], union=[EXPR$0, EXPR$1])
     <Resource name="planBefore">
       <![CDATA[
 LogicalProject(EXPR$0=[null:INTEGER])
-+- LogicalValues(tuples=[[{ 0 }]])
++- LogicalValues(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[null:INTEGER AS EXPR$0])
-+- Values(tuples=[[{ 0 }]])
++- Values(type=[RecordType(INTEGER ZERO)], tuples=[[{ 0 }]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
index e60b372..5d63a64 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
@@ -1077,7 +1077,7 @@ Join(joinType=[LeftOuterJoin], where=[=(key, key0)], select=[key, v, key0, v0],
 :  +- Calc(select=[CAST(0:BIGINT) AS key, v], where=[=(key, 0:BIGINT)], updateAsRetraction=[true], accMode=[Acc])
 :     +- TableSourceScan(table=[[default_catalog, default_database, src, source: [TestTableSource(key, v)]]], fields=[key, v], updateAsRetraction=[true], accMode=[Acc])
 +- Exchange(distribution=[hash[key]], updateAsRetraction=[true], accMode=[Acc])
-   +- Values(tuples=[[]], updateAsRetraction=[true], accMode=[Acc])
+   +- Values(type=[RecordType(BIGINT key, VARCHAR(2147483647) v)], tuples=[[]], updateAsRetraction=[true], accMode=[Acc])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
index f0d9841..5053e8d 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
@@ -99,7 +99,7 @@ LogicalProject(a=[$5], c=[$7], c0=[$2])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Values(tuples=[[]])
+Values(type=[RecordType(INTEGER a, BIGINT c, BIGINT c0)], tuples=[[]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index b117a11..e0d629d 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -241,7 +241,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
   def verifyPlan(sql: String): Unit = {
     doVerifyPlan(
       sql,
-      SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+      SqlExplainLevel.DIGEST_ATTRIBUTES,
       withRowType = false,
       printPlanBefore = true)
   }
@@ -249,7 +249,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
   def verifyPlan(table: Table): Unit = {
     doVerifyPlan(
       table,
-      SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+      SqlExplainLevel.DIGEST_ATTRIBUTES,
       withRowType = false,
       printPlanBefore = true)
   }
@@ -257,7 +257,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
   def verifyPlanWithType(sql: String): Unit = {
     doVerifyPlan(
       sql,
-      explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+      explainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES,
       withRowType = true,
       printPlanBefore = true)
   }
@@ -265,7 +265,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
   def verifyPlanWithType(table: Table): Unit = {
     doVerifyPlan(
       table,
-      explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+      explainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES,
       withRowType = true,
       printPlanBefore = true)
   }
@@ -279,7 +279,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
     val relNode = TableTestUtil.toRelNode(table)
     val optimizedPlan = getOptimizedPlan(
       Array(relNode),
-      explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+      explainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES,
       withRetractTraits = false,
       withRowType = false)
     val result = notExpected.forall(!optimizedPlan.contains(_))
@@ -337,7 +337,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
       val planBefore = SystemUtils.LINE_SEPARATOR +
         FlinkRelOptUtil.toString(
           relNode,
-          SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+          SqlExplainLevel.DIGEST_ATTRIBUTES,
           withRowType = withRowType)
       assertEqualsOrExpand("planBefore", planBefore)
     }
@@ -351,7 +351,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
     val table = getTableEnv.sqlQuery(sql)
     doVerifyPlan(
       table,
-      explainLevel = SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+      explainLevel = SqlExplainLevel.DIGEST_ATTRIBUTES,
       withRowType = false,
       withRetractTraits = false,
       printResource = true,
@@ -390,7 +390,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
       val planBefore = SystemUtils.LINE_SEPARATOR +
         FlinkRelOptUtil.toString(
           relNode,
-          SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+          SqlExplainLevel.DIGEST_ATTRIBUTES,
           withRowType = withRowType)
       assertEqualsOrExpand("planBefore", planBefore)
     }
@@ -551,7 +551,7 @@ abstract class TableTestUtil(
 
   def verifyPlan(): Unit = {
     doVerifyPlan(
-      SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+      SqlExplainLevel.DIGEST_ATTRIBUTES,
       withRowType = false,
       withRetractTraits = false,
       printPlanBefore = true)
@@ -581,7 +581,7 @@ abstract class TableTestUtil(
       val planBefore = new StringBuilder
       relNodes.foreach { sink =>
         planBefore.append(System.lineSeparator)
-        planBefore.append(FlinkRelOptUtil.toString(sink, SqlExplainLevel.EXPPLAN_ATTRIBUTES))
+        planBefore.append(FlinkRelOptUtil.toString(sink, SqlExplainLevel.DIGEST_ATTRIBUTES))
       }
       assertEqualsOrExpand("planBefore", planBefore.toString())
     }
@@ -686,7 +686,7 @@ case class StreamTableTestUtil(
 
   def verifyPlanWithTrait(): Unit = {
     doVerifyPlan(
-      SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+      SqlExplainLevel.DIGEST_ATTRIBUTES,
       withRetractTraits = true,
       withRowType = false,
       printPlanBefore = true)
@@ -695,7 +695,7 @@ case class StreamTableTestUtil(
   def verifyPlanWithTrait(sql: String): Unit = {
     doVerifyPlan(
       sql,
-      SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+      SqlExplainLevel.DIGEST_ATTRIBUTES,
       withRetractTraits = true,
       withRowType = false,
       printPlanBefore = true)
@@ -704,7 +704,7 @@ case class StreamTableTestUtil(
   def verifyPlanWithTrait(table: Table): Unit = {
     doVerifyPlan(
       table,
-      SqlExplainLevel.EXPPLAN_ATTRIBUTES,
+      SqlExplainLevel.DIGEST_ATTRIBUTES,
       withRetractTraits = true,
       withRowType = false,
       printPlanBefore = true)