You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/03/23 13:18:14 UTC

[flink] 02/02: [FLINK-26518][table] Support BridgingSqlFunction with SqlTableFunction for Scala implicits

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aea176e1fc15a9cf0388e8b32ce3bb3688e876bf
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Mar 9 15:38:52 2022 +0100

    [FLINK-26518][table] Support BridgingSqlFunction with SqlTableFunction for Scala implicits
    
    This closes #19137.
---
 .../operations/utils/CalculatedTableFactory.java   |  6 ++-
 .../table/api/ImplicitExpressionConversions.scala  |  8 +--
 .../calcite/sql/validate/ProcedureNamespace.java   | 35 ++++++-------
 .../table/planner/calcite/FlinkRelBuilder.java     | 59 ++++++++++++++++++++++
 .../functions/bridging/BridgingSqlFunction.java    | 39 +++++++++++++-
 .../planner/plan/QueryOperationConverter.java      | 12 +++--
 .../planner/plan/utils/SetOpRewriteUtil.scala      | 12 +++--
 .../planner/runtime/stream/sql/FunctionITCase.java | 33 +++++-------
 .../runtime/stream/table/FunctionITCase.java       | 39 +++++++-------
 .../utils/JavaUserDefinedTableFunctions.java       |  6 ++-
 .../planner/plan/batch/sql/SetOperatorsTest.xml    |  8 +--
 .../planner/plan/batch/table/CorrelateTest.xml     | 28 +++++-----
 .../planner/plan/batch/table/SetOperatorsTest.xml  |  8 +--
 .../stringexpr/CorrelateStringExpressionTest.xml   | 34 ++++++-------
 .../planner/plan/common/PartialInsertTest.xml      | 16 +++---
 .../rules/logical/RewriteIntersectAllRuleTest.xml  | 12 ++---
 .../plan/rules/logical/RewriteMinusAllRuleTest.xml | 12 ++---
 .../planner/plan/stream/sql/SetOperatorsTest.xml   |  8 +--
 .../plan/stream/table/ColumnFunctionsTest.xml      |  4 +-
 .../planner/plan/stream/table/CorrelateTest.xml    | 51 +++++++++----------
 .../stream/table/TemporalTableFunctionJoinTest.xml | 10 ++--
 .../planner/plan/batch/sql/TableSourceTest.scala   |  1 -
 .../planner/plan/batch/table/CorrelateTest.scala   | 12 -----
 .../stringexpr/CorrelateStringExpressionTest.scala |  4 --
 .../table/validation/CorrelateValidationTest.scala |  1 -
 .../planner/plan/stream/table/CorrelateTest.scala  | 17 -------
 .../table/validation/CorrelateValidationTest.scala |  3 +-
 .../runtime/batch/table/CorrelateITCase.scala      |  3 +-
 .../planner/runtime/utils/StreamingTestBase.scala  |  3 ++
 .../planner/utils/UserDefinedTableFunctions.scala  | 24 +++------
 30 files changed, 274 insertions(+), 234 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/CalculatedTableFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/CalculatedTableFactory.java
index 09b9eca..6d20abd 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/CalculatedTableFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/CalculatedTableFactory.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.expressions.ExpressionUtils;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.utils.ResolvedExpressionDefaultVisitor;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
 import org.apache.flink.table.operations.CalculatedQueryOperation;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.types.DataType;
@@ -38,6 +39,7 @@ import java.util.Collections;
 import java.util.List;
 
 import static java.util.stream.Collectors.toList;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.isFunctionOfKind;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS;
 
 /** Utility class for creating a valid {@link CalculatedQueryOperation} operation. */
@@ -89,7 +91,7 @@ final class CalculatedTableFactory {
                                                                                     + alias)))
                             .collect(toList());
 
-            if (!(children.get(0) instanceof CallExpression)) {
+            if (!isFunctionOfKind(children.get(0), FunctionKind.TABLE)) {
                 throw fail();
             }
 
@@ -156,7 +158,7 @@ final class CalculatedTableFactory {
 
         private ValidationException fail() {
             return new ValidationException(
-                    "A lateral join only accepts a string expression which defines a table function "
+                    "A lateral join only accepts an expression which defines a table function "
                             + "call that might be followed by some alias.");
         }
     }
diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
index b2c0ca2..c36c3d1 100644
--- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
+++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
@@ -158,17 +158,13 @@ trait ImplicitExpressionConversions {
     }
   }
 
-  implicit class TableFunctionCall[T: TypeInformation](val t: TableFunction[T]) {
+  implicit class TableFunctionCall(val t: TableFunction[_]) {
 
     /**
       * Calls a table function for the given parameters.
       */
     def apply(params: Expression*): Expression = {
-      val resultTypeInfo: TypeInformation[T] = UserDefinedFunctionHelper
-        .getReturnTypeOfTableFunction(t, implicitly[TypeInformation[T]])
-      unresolvedCall(
-        new TableFunctionDefinition(t.getClass.getName, t, resultTypeInfo),
-        params.map(ApiExpressionUtils.objectToExpression): _*)
+      unresolvedCall(t, params.map(ApiExpressionUtils.objectToExpression): _*)
     }
   }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java
index cf9beec..22e9380 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java
@@ -21,13 +21,12 @@ import org.apache.flink.annotation.Internal;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlCallBinding;
-import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlTableFunction;
-import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.type.SqlReturnTypeInference;
-import org.apache.calcite.sql.type.SqlTypeName;
+
+import static java.util.Objects.requireNonNull;
 
 /**
  * Namespace whose contents are defined by the result of a call to a user-defined procedure.
@@ -56,25 +55,23 @@ public final class ProcedureNamespace extends AbstractNamespace {
 
     public RelDataType validateImpl(RelDataType targetRowType) {
         validator.inferUnknownTypes(validator.unknownType, scope, call);
-        final RelDataType type = validator.deriveTypeImpl(scope, call);
+        // The result is ignored but the type is derived to trigger the validation
+        validator.deriveTypeImpl(scope, call);
         final SqlOperator operator = call.getOperator();
         final SqlCallBinding callBinding = new SqlCallBinding(validator, scope, call);
-        if (operator instanceof SqlTableFunction) {
-            final SqlTableFunction tableFunction = (SqlTableFunction) operator;
-            if (type.getSqlTypeName() != SqlTypeName.CURSOR) {
-                throw new IllegalArgumentException(
-                        "Table function should have CURSOR " + "type, not " + type);
-            }
-            final SqlReturnTypeInference rowTypeInference = tableFunction.getRowTypeInference();
-            RelDataType retType = rowTypeInference.inferReturnType(callBinding);
-            return validator.getTypeFactory().createTypeWithNullability(retType, false);
-        }
-
-        // special handling of collection tables TABLE(function(...))
-        if (SqlUtil.stripAs(enclosingNode).getKind() == SqlKind.COLLECTION_TABLE) {
-            return toStruct(type, getNode());
+        if (!(operator instanceof SqlTableFunction)) {
+            throw new IllegalArgumentException(
+                    "Argument must be a table function: " + operator.getNameAsId());
         }
-        return type;
+        final SqlTableFunction tableFunction = (SqlTableFunction) operator;
+        final SqlReturnTypeInference rowTypeInference = tableFunction.getRowTypeInference();
+        final RelDataType rowRelDataType =
+                requireNonNull(
+                        rowTypeInference.inferReturnType(callBinding),
+                        () -> "got null from inferReturnType for call " + callBinding.getCall());
+        // For BridgingSqlFunction the type can still be atomic
+        // and will be wrapped with a proper field alias
+        return toStruct(rowRelDataType, getNode());
     }
 
     /** Converts a type to a struct if it is not already. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
index 35ab473..c5b774d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.planner.calcite.FlinkRelFactories.ExpandFactory;
 import org.apache.flink.table.planner.calcite.FlinkRelFactories.RankFactory;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
 import org.apache.flink.table.planner.hint.FlinkHints;
 import org.apache.flink.table.planner.plan.QueryOperationConverter;
 import org.apache.flink.table.planner.plan.logical.LogicalWindow;
@@ -33,6 +34,7 @@ import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowTableAggre
 import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
 import org.apache.flink.table.runtime.operators.rank.RankRange;
 import org.apache.flink.table.runtime.operators.rank.RankType;
+import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
@@ -40,6 +42,7 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptSchema;
 import org.apache.calcite.plan.RelOptTable.ToRelContext;
 import org.apache.calcite.plan.ViewExpanders;
@@ -48,18 +51,26 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.tools.RelBuilderFactory;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.table.planner.plan.utils.AggregateUtil.isTableAggregate;
 
@@ -105,6 +116,54 @@ public final class FlinkRelBuilder extends RelBuilder {
         };
     }
 
+    /**
+     * {@link RelBuilder#functionScan(SqlOperator, int, Iterable)} cannot work smoothly with aliases
+     * which is why we implement a custom one. The method is static because some {@link RelOptRule}s
+     * don't use {@link FlinkRelBuilder}.
+     */
+    public static RelBuilder pushFunctionScan(
+            RelBuilder relBuilder,
+            SqlOperator operator,
+            int inputCount,
+            Iterable<RexNode> operands,
+            List<String> aliases) {
+        Preconditions.checkArgument(
+                operator instanceof BridgingSqlFunction.WithTableFunction,
+                "Table function expected.");
+        final RexBuilder rexBuilder = relBuilder.getRexBuilder();
+        final RelDataTypeFactory typeFactory = relBuilder.getTypeFactory();
+
+        final List<RelNode> inputs = new LinkedList<>();
+        for (int i = 0; i < inputCount; i++) {
+            inputs.add(0, relBuilder.build());
+        }
+
+        final List<RexNode> operandList = CollectionUtil.iterableToList(operands);
+
+        final RelDataType functionRelDataType = rexBuilder.deriveReturnType(operator, operandList);
+        final List<RelDataType> fieldRelDataTypes;
+        if (functionRelDataType.isStruct()) {
+            fieldRelDataTypes =
+                    functionRelDataType.getFieldList().stream()
+                            .map(RelDataTypeField::getType)
+                            .collect(Collectors.toList());
+        } else {
+            fieldRelDataTypes = Collections.singletonList(functionRelDataType);
+        }
+        final RelDataType rowRelDataType = typeFactory.createStructType(fieldRelDataTypes, aliases);
+
+        final RexNode call = rexBuilder.makeCall(rowRelDataType, operator, operandList);
+        final RelNode functionScan =
+                LogicalTableFunctionScan.create(
+                        relBuilder.getCluster(),
+                        inputs,
+                        call,
+                        null,
+                        rowRelDataType,
+                        Collections.emptySet());
+        return relBuilder.push(functionScan);
+    }
+
     public RelBuilder expand(List<List<RexNode>> projects, int expandIdIndex) {
         final RelNode input = build();
         final RelNode expand = expandFactory.createExpand(input, projects, expandIdIndex);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
index 5dbe4d8..d661787 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionIdentifier;
 import org.apache.flink.table.functions.FunctionKind;
 import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
 import org.apache.flink.table.types.DataType;
@@ -35,6 +36,10 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlTableFunction;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.calcite.tools.RelBuilder;
 
 import java.util.List;
 
@@ -52,7 +57,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * (either a system or user-defined function).
  */
 @Internal
-public final class BridgingSqlFunction extends SqlFunction {
+public class BridgingSqlFunction extends SqlFunction {
 
     private final DataTypeFactory dataTypeFactory;
 
@@ -108,6 +113,10 @@ public final class BridgingSqlFunction extends SqlFunction {
                 functionKind == FunctionKind.SCALAR || functionKind == FunctionKind.TABLE,
                 "Scalar or table function kind expected.");
 
+        if (functionKind == FunctionKind.TABLE) {
+            return new BridgingSqlFunction.WithTableFunction(
+                    dataTypeFactory, typeFactory, kind, resolvedFunction, typeInference);
+        }
         return new BridgingSqlFunction(
                 dataTypeFactory, typeFactory, kind, resolvedFunction, typeInference);
     }
@@ -177,4 +186,32 @@ public final class BridgingSqlFunction extends SqlFunction {
     public boolean isDeterministic() {
         return resolvedFunction.getDefinition().isDeterministic();
     }
+
+    // --------------------------------------------------------------------------------------------
+    // Table function extension
+    // --------------------------------------------------------------------------------------------
+
+    /** Special flavor of {@link BridgingSqlFunction} to indicate a table function to Calcite. */
+    public static class WithTableFunction extends BridgingSqlFunction implements SqlTableFunction {
+
+        private WithTableFunction(
+                DataTypeFactory dataTypeFactory,
+                FlinkTypeFactory typeFactory,
+                SqlKind kind,
+                ContextResolvedFunction resolvedFunction,
+                TypeInference typeInference) {
+            super(dataTypeFactory, typeFactory, kind, resolvedFunction, typeInference);
+        }
+
+        /**
+         * The conversion to a row type is handled on the caller side. This allows us to perform it
+         * SQL/Table API-specific. This is in particular important to set the aliases of fields
+         * correctly (see {@link FlinkRelBuilder#pushFunctionScan(RelBuilder, SqlOperator, int,
+         * Iterable, List)}).
+         */
+        @Override
+        public SqlReturnTypeInference getRowTypeInference() {
+            return getReturnTypeInference();
+        }
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
index 837b3b2..3fcf62fa 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
@@ -305,10 +305,14 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod
             final BridgingSqlFunction sqlFunction =
                     BridgingSqlFunction.of(relBuilder.getCluster(), resolvedFunction);
 
-            return relBuilder
-                    .functionScan(sqlFunction, 0, parameters)
-                    .rename(calculatedTable.getResolvedSchema().getColumnNames())
-                    .build();
+            FlinkRelBuilder.pushFunctionScan(
+                    relBuilder,
+                    sqlFunction,
+                    0,
+                    parameters,
+                    calculatedTable.getResolvedSchema().getColumnNames());
+
+            return relBuilder.build();
         }
 
         private RelNode convertLegacyTableFunction(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/SetOpRewriteUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/SetOpRewriteUtil.scala
index eb6d422..32e400f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/SetOpRewriteUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/SetOpRewriteUtil.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.utils
 
 
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
 
 import org.apache.calcite.plan.RelOptUtil
@@ -73,12 +74,15 @@ object SetOpRewriteUtil {
     val cluster = relBuilder.getCluster
 
     val sqlFunction = BridgingSqlFunction.of(
-      relBuilder.getCluster,
+      cluster,
       BuiltInFunctionDefinitions.INTERNAL_REPLICATE_ROWS)
 
-    relBuilder
-      .functionScan(sqlFunction, 0, relBuilder.fields(Util.range(fields.size() + 1)))
-      .rename(outputRelDataType.getFieldNames)
+    FlinkRelBuilder.pushFunctionScan(
+      relBuilder,
+      sqlFunction,
+      0,
+      relBuilder.fields(Util.range(fields.size() + 1)),
+      outputRelDataType.getFieldNames)
 
     // correlated join
     val corSet = Collections.singleton(cluster.createCorrel())
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index d39d180..ed19aba 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -64,6 +64,7 @@ import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -925,16 +926,12 @@ public class FunctionITCase extends StreamingTestBase {
     public void testInvalidUseOfSystemScalarFunction() {
         tEnv().executeSql("CREATE TABLE SinkTable(s STRING) WITH ('connector' = 'COLLECTION')");
 
-        try {
-            tEnv().explainSql("INSERT INTO SinkTable " + "SELECT * FROM TABLE(MD5('3'))");
-            fail();
-        } catch (ValidationException e) {
-            assertThat(
-                    e,
-                    hasMessage(
-                            containsString(
-                                    "Currently, only table functions can be used in a correlate operation.")));
-        }
+        assertThatThrownBy(
+                        () ->
+                                tEnv().explainSql(
+                                                "INSERT INTO SinkTable "
+                                                        + "SELECT * FROM TABLE(MD5('3'))"))
+                .hasMessageContaining("Argument must be a table function: MD5");
     }
 
     @Test
@@ -946,16 +943,12 @@ public class FunctionITCase extends StreamingTestBase {
 
         tEnv().createTemporarySystemFunction("RowTableFunction", RowTableFunction.class);
 
-        try {
-            tEnv().explainSql("INSERT INTO SinkTable " + "SELECT RowTableFunction('test')");
-            fail();
-        } catch (ValidationException e) {
-            assertThat(
-                    e,
-                    hasMessage(
-                            containsString(
-                                    "Currently, only scalar functions can be used in a projection or filter operation.")));
-        }
+        assertThatThrownBy(
+                        () ->
+                                tEnv().explainSql(
+                                                "INSERT INTO SinkTable "
+                                                        + "SELECT RowTableFunction('test')"))
+                .hasMessageContaining("Cannot call table function here: 'RowTableFunction'");
     }
 
     @Test
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java
index 6891d62..06af8ef 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java
@@ -28,27 +28,23 @@ import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory
 import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.api.Expressions.call;
-import static org.hamcrest.CoreMatchers.containsString;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
-import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
 
 /** Tests for user defined functions in the Table API. */
 public class FunctionITCase extends StreamingTestBase {
 
-    @Rule public ExpectedException thrown = ExpectedException.none();
-
     @Test
-    public void testScalarFunction() throws Exception {
+    void testScalarFunction() throws Exception {
         final List<Row> sourceData =
                 Arrays.asList(Row.of(1, 1L, 1L), Row.of(2, 2L, 1L), Row.of(3, 3L, 1L));
 
@@ -61,7 +57,7 @@ public class FunctionITCase extends StreamingTestBase {
         tEnv().executeSql(
                         "CREATE TABLE TestTable(a INT, b BIGINT, c BIGINT) WITH ('connector' = 'COLLECTION')");
 
-        Table table =
+        final Table table =
                 tEnv().from("TestTable")
                         .select(
                                 $("a"),
@@ -75,7 +71,7 @@ public class FunctionITCase extends StreamingTestBase {
     }
 
     @Test
-    public void testJoinWithTableFunction() throws Exception {
+    void testJoinWithTableFunction() throws Exception {
         final List<Row> sourceData =
                 Arrays.asList(
                         Row.of("1,2,3"), Row.of("2,3,4"), Row.of("3,4,5"), Row.of((String) null));
@@ -103,23 +99,22 @@ public class FunctionITCase extends StreamingTestBase {
     }
 
     @Test
-    public void testLateralJoinWithScalarFunction() throws Exception {
-        thrown.expect(ValidationException.class);
-        thrown.expect(
-                hasMessage(
-                        containsString(
-                                "Currently, only table functions can be used in a correlate operation.")));
-
+    void testLateralJoinWithScalarFunction() throws Exception {
         TestCollectionTableFactory.reset();
         tEnv().executeSql("CREATE TABLE SourceTable(s STRING) WITH ('connector' = 'COLLECTION')");
         tEnv().executeSql(
                         "CREATE TABLE SinkTable(s STRING, sa ARRAY<STRING>) WITH ('connector' = 'COLLECTION')");
 
-        tEnv().from("SourceTable")
-                .joinLateral(call(new RowScalarFunction(), $("s")).as("a", "b"))
-                .select($("a"), $("b"))
-                .executeInsert("SinkTable")
-                .await();
+        assertThatThrownBy(
+                        () -> {
+                            tEnv().from("SourceTable")
+                                    .joinLateral(
+                                            call(new RowScalarFunction(), $("s")).as("a", "b"));
+                        })
+                .satisfies(
+                        anyCauseMatches(
+                                ValidationException.class,
+                                "A lateral join only accepts an expression which defines a table function"));
     }
 
     // --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
index 95f6e51..9d7d559 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.runtime.utils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.functions.TableFunction;
 
@@ -34,7 +35,10 @@ public class JavaUserDefinedTableFunctions {
 
     /** Emit inputs as long. */
     public static class JavaTableFunc0 extends TableFunction<Long> {
-        public void eval(Integer a, Long b, TimestampData c) {
+        public void eval(
+                @DataTypeHint("DATE") Integer a,
+                Long b,
+                @DataTypeHint("TIMESTAMP(0)") TimestampData c) {
             collect(a.longValue());
             collect(b);
             collect(c.getMillisecond());
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml
index d34b6e8..7b89148 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml
@@ -57,8 +57,8 @@ LogicalIntersect(all=[true])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[f0 AS c])
-+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1($f0,c))], select=[$f0,c,f0], rowType=[RecordType(BIGINT $f0, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[c0 AS c])
++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1($f0,c))], select=[$f0,c,c0], rowType=[RecordType(BIGINT $f0, VARCHAR(2147483647) c, VARCHAR(2147483647) c0)], joinType=[INNER])
    +- Calc(select=[IF((vcol_left_cnt > vcol_right_cnt), vcol_right_cnt, vcol_left_cnt) AS $f0, c], where=[((vcol_left_cnt >= 1) AND (vcol_right_cnt >= 1))])
       +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_COUNT(count$0) AS vcol_left_cnt, Final_COUNT(count$1) AS vcol_right_cnt])
          +- Exchange(distribution=[hash[c]])
@@ -180,8 +180,8 @@ LogicalMinus(all=[true])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[f0 AS c])
-+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,c))], select=[sum_vcol_marker,c,f0], rowType=[RecordType(BIGINT sum_vcol_marker, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[c0 AS c])
++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,c))], select=[sum_vcol_marker,c,c0], rowType=[RecordType(BIGINT sum_vcol_marker, VARCHAR(2147483647) c, VARCHAR(2147483647) c0)], joinType=[INNER])
    +- Calc(select=[sum_vcol_marker, c], where=[(sum_vcol_marker > 0)])
       +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS sum_vcol_marker])
          +- Exchange(distribution=[hash[c]])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml
index d6347f7..a2d90a0 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml
@@ -26,13 +26,13 @@ LogicalProject(c=[$0], d=[$1])
          +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
             :- LogicalProject(a=[$0], b=[$1], c=[$2])
             :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-            +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
+            +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, d])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -42,12 +42,12 @@ Calc(select=[c, d])
       <![CDATA[
 LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], rowType=[RecordType(INTEGER x, INTEGER y)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], rowType=[RecordType(INTEGER x, INTEGER y)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-PythonCorrelate(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], correlate=[table(MockPythonTableFunction(a,b))], select=[a,b,c,x,y], rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, INTEGER y)], joinType=[INNER])
+PythonCorrelate(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], correlate=[table(*org.apache.flink.table.planner.utils.MockPythonTableFunction*(a,b))], select=[a,b,c,x,y], rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, INTEGER y)], joinType=[INNER])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -62,13 +62,13 @@ LogicalProject(c=[$0], d=[$1])
          +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
             :- LogicalProject(a=[$0], b=[$1], c=[$2])
             :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-            +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
+            +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, d])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -79,13 +79,13 @@ Calc(select=[c, d])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -96,13 +96,13 @@ Calc(select=[c, s])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(TableFunc1(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -114,13 +114,13 @@ LogicalFilter(condition=[>($1, _UTF-16LE'')])
 +- LogicalProject(c=[$2], s=[$3])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, s], where=[(s > '')])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -131,13 +131,13 @@ Calc(select=[c, s], where=[(s > '')])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml
index 598ee0e..d31bda9 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml
@@ -33,8 +33,8 @@ Calc(select=[EXPR$0 AS a, b, EXPR$1 AS c])
 +- HashAggregate(isMerge=[true], groupBy=[b], select=[b, Final_SUM(sum$0) AS EXPR$0, Final_COUNT(count$1) AS EXPR$1])
    +- Exchange(distribution=[hash[b]])
       +- LocalHashAggregate(groupBy=[b], select=[b, Partial_SUM(a) AS sum$0, Partial_COUNT(c) AS count$1])
-         +- Calc(select=[f0 AS a, f1 AS b, f2 AS c])
-            +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,b,c))], select=[sum_vcol_marker,a,b,c,f0,f1,f2], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER f0, BIGINT f1, VARCHAR(2147483647) f2)], joinType=[INNER])
+         +- Calc(select=[a0 AS a, b0 AS b, c0 AS c])
+            +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,b,c))], select=[sum_vcol_marker,a,b,c,a0,b0,c0], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER a0, BIGINT b0, VARCHAR(2147483647) c0)], joinType=[INNER])
                +- Calc(select=[sum_vcol_marker, a, b, c], where=[(sum_vcol_marker > 0)])
                   +- HashAggregate(isMerge=[true], groupBy=[a, b, c], select=[a, b, c, Final_SUM(sum$0) AS sum_vcol_marker])
                      +- Exchange(distribution=[hash[a, b, c]])
@@ -123,8 +123,8 @@ LogicalProject(b=[$1], c=[$2])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[f0 AS b, f1 AS c])
-+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,b,c))], select=[sum_vcol_marker,b,c,f0,f1], rowType=[RecordType(BIGINT sum_vcol_marker, BIGINT b, VARCHAR(2147483647) c, BIGINT f0, VARCHAR(2147483647) f1)], joinType=[INNER])
+Calc(select=[b0 AS b, c0 AS c])
++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,b,c))], select=[sum_vcol_marker,b,c,b0,c0], rowType=[RecordType(BIGINT sum_vcol_marker, BIGINT b, VARCHAR(2147483647) c, BIGINT b0, VARCHAR(2147483647) c0)], joinType=[INNER])
    +- Calc(select=[sum_vcol_marker, b, c], where=[(sum_vcol_marker > 0)])
       +- HashAggregate(isMerge=[true], groupBy=[b, c], select=[b, c, Final_SUM(sum$0) AS sum_vcol_marker])
          +- Exchange(distribution=[hash[b, c]])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml
index 09262da..e698687 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml
@@ -22,13 +22,13 @@ limitations under the License.
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -39,13 +39,13 @@ Calc(select=[c, s])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -56,13 +56,13 @@ Calc(select=[c, s])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(TableFunc1(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -73,13 +73,13 @@ Calc(select=[c, s])
 LogicalProject(c=[$2], name=[$3], len=[$4])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, name, len])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(TableFunc2(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -90,13 +90,13 @@ Calc(select=[c, name, len])
 LogicalProject(c=[$2], name=[$3], len=[$5], adult=[$4])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, name, len, adult])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], correlate=[table(HierarchyTableFunction(c))], select=[a,b,c,name,adult,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], correlate=[table(*org.apache.flink.table.planner.utils.HierarchyTableFunction*(c))], select=[a,b,c,name,adult,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -104,16 +104,16 @@ Calc(select=[c, name, len, adult])
   <TestCase name="testCorrelateJoins6">
     <Resource name="ast">
       <![CDATA[
-LogicalProject(c=[$2], name=[$4], age=[$3])
+LogicalProject(c=[$2], name=[$3], age=[$4])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], rowType=[RecordType(INTEGER age, VARCHAR(2147483647) name)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, name, age])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], correlate=[table(PojoTableFunc(c))], select=[a,b,c,age,name], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER age, VARCHAR(2147483647) name)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], correlate=[table(*org.apache.flink.table.planner.utils.PojoTableFunc*(c))], select=[a,b,c,name,age], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -125,13 +125,13 @@ LogicalFilter(condition=[>($2, 2)])
 +- LogicalProject(c=[$2], name=[$3], len=[$4])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
       :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
-      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;])
+      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, name, len])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(TableFunc2(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -142,13 +142,13 @@ Calc(select=[c, name, len])
 LogicalProject(a=[$0], c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], correlate=[table(TableFunc1(SUBSTRING(c, 2)))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING(c, 2)))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
index 067ed8e..1ede5ab 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
@@ -127,8 +127,8 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], fields=[a
     <Resource name="optimized rel plan">
       <![CDATA[
 Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d, e, f, g])
-+- Calc(select=[f0 AS a, f1 AS c, f2 AS d, f3 AS e, f4 AS f, f5 AS g])
-   +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,c,d,e,f,g))], select=[sum_vcol_marker,a,c,d,e,f,g,f0,f1,f2,f3,f4,f5], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, DOUBLE f3, BIGINT f4, INTEGER f5)], joinType=[INNER])
++- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+   +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,c,d,e,f,g))], select=[sum_vcol_marker,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT f0, INTEGER g0)], joinType=[INNER])
       +- Calc(select=[sum_vcol_marker, a, c, d, e, f, g], where=[>(sum_vcol_marker, 0)])
          +- GroupAggregate(groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f, g, SUM_RETRACT(vcol_marker) AS sum_vcol_marker])
             +- Exchange(distribution=[hash[a, c, d, e, f, g]])
@@ -165,8 +165,8 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], fields=[a
       <![CDATA[
 Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d, e, f, g])
 +- Sort(orderBy=[c ASC, d ASC])
-   +- Calc(select=[f0 AS a, f1 AS c, f2 AS d, f3 AS e, f4 AS f, f5 AS g])
-      +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,c,d,e,f,g))], select=[sum_vcol_marker,a,c,d,e,f,g,f0,f1,f2,f3,f4,f5], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, DOUBLE f3, BIGINT f4, INTEGER f5)], joinType=[INNER])
+   +- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+      +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,c,d,e,f,g))], select=[sum_vcol_marker,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT f0, INTEGER g0)], joinType=[INNER])
          +- Calc(select=[sum_vcol_marker, a, c, d, e, f, g], where=[>(sum_vcol_marker, 0)])
             +- HashAggregate(isMerge=[true], groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f, g, Final_SUM(sum$0) AS sum_vcol_marker])
                +- Exchange(distribution=[hash[a, c, d, e, f, g]])
@@ -205,8 +205,8 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], fields=[a
     <Resource name="optimized rel plan">
       <![CDATA[
 Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d, e, f, g])
-+- Calc(select=[f0 AS a, f1 AS c, f2 AS d, f3 AS e, f4 AS f, f5 AS g])
-   +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1($f0,a,c,d,e,f,g))], select=[$f0,a,c,d,e,f,g,f0,f1,f2,f3,f4,f5], rowType=[RecordType(BIGINT $f0, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, DOUBLE f3, BIGINT f4, INTEGER f5)], joinType=[INNER])
++- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+   +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1($f0,a,c,d,e,f,g))], select=[$f0,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT $f0, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT f0, INTEGER g0)], joinType=[INNER])
       +- Calc(select=[IF(>(vcol_left_cnt, vcol_right_cnt), vcol_right_cnt, vcol_left_cnt) AS $f0, a, c, d, e, f, g], where=[AND(>=(vcol_left_cnt, 1), >=(vcol_right_cnt, 1))])
          +- GroupAggregate(groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f, g, COUNT_RETRACT(vcol_left_marker) AS vcol_left_cnt, COUNT_RETRACT(vcol_right_marker) AS vcol_right_cnt])
             +- Exchange(distribution=[hash[a, c, d, e, f, g]])
@@ -243,8 +243,8 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], fields=[a
       <![CDATA[
 Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d, e, f, g])
 +- Sort(orderBy=[c ASC, d ASC])
-   +- Calc(select=[f0 AS a, f1 AS c, f2 AS d, f3 AS e, f4 AS f, f5 AS g])
-      +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1($f0,a,c,d,e,f,g))], select=[$f0,a,c,d,e,f,g,f0,f1,f2,f3,f4,f5], rowType=[RecordType(BIGINT $f0, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, DOUBLE f3, BIGINT f4, INTEGER f5)], joinType=[INNER])
+   +- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+      +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1($f0,a,c,d,e,f,g))], select=[$f0,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT $f0, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT f0, INTEGER g0)], joinType=[INNER])
          +- Calc(select=[IF(>(vcol_left_cnt, vcol_right_cnt), vcol_right_cnt, vcol_left_cnt) AS $f0, a, c, d, e, f, g], where=[AND(>=(vcol_left_cnt, 1), >=(vcol_right_cnt, 1))])
             +- HashAggregate(isMerge=[true], groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f, g, Final_COUNT(count$0) AS vcol_left_cnt, Final_COUNT(count$1) AS vcol_right_cnt])
                +- Exchange(distribution=[hash[a, c, d, e, f, g]])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml
index c5a3d2f..ad832c4 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml
@@ -43,8 +43,7 @@ LogicalProject(c=[$2])
    :           +- LogicalProject(f=[$0], vcol_left_marker=[null:BOOLEAN], vcol_right_marker=[true])
    :              +- LogicalProject(f=[$2])
    :                 +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
-   +- LogicalProject(c=[$0])
-      +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+   +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)])
 ]]>
     </Resource>
   </TestCase>
@@ -77,8 +76,7 @@ LogicalProject(c=[$2])
    :           +- LogicalProject(f=[$0], vcol_left_marker=[null:BOOLEAN], vcol_right_marker=[true])
    :              +- LogicalProject(f=[$2])
    :                 +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
-   +- LogicalProject(c=[$0])
-      +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+   +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)])
 ]]>
     </Resource>
   </TestCase>
@@ -113,8 +111,7 @@ LogicalProject(c=[$2])
          :           +- LogicalProject(d=[$0], e=[$1], f=[$2], vcol_left_marker=[null:BOOLEAN], vcol_right_marker=[true])
          :              +- LogicalProject(d=[$0], e=[$1], f=[$2])
          :                 +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
-         +- LogicalProject(a=[$0], b=[$1], c=[$2])
-            +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], rowType=[RecordType:peek_no_expand(INTEGER f0, BIGINT f1, VARCHAR(2147483647) f2)])
+         +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)])
 ]]>
     </Resource>
   </TestCase>
@@ -147,8 +144,7 @@ LogicalProject(c=[$2])
    :              +- LogicalProject(f=[$2])
    :                 +- LogicalFilter(condition=[=(1, 0)])
    :                    +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
-   +- LogicalProject(c=[$0])
-      +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+   +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml
index ae6d9dc..e895649 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml
@@ -43,8 +43,7 @@ LogicalProject(c=[$2])
    :           +- LogicalProject(f=[$0], vcol_marker=[-1:BIGINT])
    :              +- LogicalProject(f=[$2])
    :                 +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
-   +- LogicalProject(c=[$0])
-      +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+   +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)])
 ]]>
     </Resource>
   </TestCase>
@@ -77,8 +76,7 @@ LogicalProject(c=[$2])
    :           +- LogicalProject(f=[$0], vcol_marker=[-1:BIGINT])
    :              +- LogicalProject(f=[$2])
    :                 +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
-   +- LogicalProject(c=[$0])
-      +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+   +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)])
 ]]>
     </Resource>
   </TestCase>
@@ -111,8 +109,7 @@ LogicalProject(c=[$2])
    :              +- LogicalProject(f=[$2])
    :                 +- LogicalFilter(condition=[=(1, 0)])
    :                    +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
-   +- LogicalProject(c=[$0])
-      +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+   +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)])
 ]]>
     </Resource>
   </TestCase>
@@ -147,8 +144,7 @@ LogicalProject(c=[$2])
          :           +- LogicalProject(d=[$0], e=[$1], f=[$2], vcol_marker=[-1:BIGINT])
          :              +- LogicalProject(d=[$0], e=[$1], f=[$2])
          :                 +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
-         +- LogicalProject(a=[$0], b=[$1], c=[$2])
-            +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], rowType=[RecordType:peek_no_expand(INTEGER f0, BIGINT f1, VARCHAR(2147483647) f2)])
+         +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml
index 59d848b..61552b1 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml
@@ -58,8 +58,8 @@ LogicalIntersect(all=[true])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[f0 AS c])
-+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1($f0,c))], select=[$f0,c,f0], rowType=[RecordType(BIGINT $f0, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[c0 AS c])
++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1($f0,c))], select=[$f0,c,c0], rowType=[RecordType(BIGINT $f0, VARCHAR(2147483647) c, VARCHAR(2147483647) c0)], joinType=[INNER])
    +- Calc(select=[IF((vcol_left_cnt > vcol_right_cnt), vcol_right_cnt, vcol_left_cnt) AS $f0, c], where=[((vcol_left_cnt >= 1) AND (vcol_right_cnt >= 1))])
       +- GroupAggregate(groupBy=[c], select=[c, COUNT(vcol_left_marker) AS vcol_left_cnt, COUNT(vcol_right_marker) AS vcol_right_cnt])
          +- Exchange(distribution=[hash[c]])
@@ -182,8 +182,8 @@ LogicalMinus(all=[true])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[f0 AS c])
-+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,c))], select=[sum_vcol_marker,c,f0], rowType=[RecordType(BIGINT sum_vcol_marker, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[c0 AS c])
++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,c))], select=[sum_vcol_marker,c,c0], rowType=[RecordType(BIGINT sum_vcol_marker, VARCHAR(2147483647) c, VARCHAR(2147483647) c0)], joinType=[INNER])
    +- Calc(select=[sum_vcol_marker, c], where=[(sum_vcol_marker > 0)])
       +- GroupAggregate(groupBy=[c], select=[c, SUM(vcol_marker) AS sum_vcol_marker])
          +- Exchange(distribution=[hash[c]])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml
index 1c8dab3..063fcca 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml
@@ -153,12 +153,12 @@ Join(joinType=[InnerJoin], where=[(int1 = int2)], select=[int1, long1, string1,
       <![CDATA[
 LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(int, long, string)]]])
-+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(string))], select=[int,long,string,name,age], rowType=[RecordType(DOUBLE int, BIGINT long, VARCHAR(2147483647) string, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER])
+Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(string))], select=[int,long,string,name,age], rowType=[RecordType(DOUBLE int, BIGINT long, VARCHAR(2147483647) string, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(int, long, string)]]], fields=[int, long, string])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
index cbee6a2..2542e8d 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
@@ -21,12 +21,12 @@ limitations under the License.
       <![CDATA[
 LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], rowType=[RecordType(INTEGER x, INTEGER y)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], rowType=[RecordType(INTEGER x, INTEGER y)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-PythonCorrelate(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], correlate=[table(MockPythonTableFunction(a,b))], select=[a,b,c,x,y], rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, INTEGER y)], joinType=[INNER])
+PythonCorrelate(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], correlate=[table(*org.apache.flink.table.planner.utils.MockPythonTableFunction*(a,b))], select=[a,b,c,x,y], rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, INTEGER y)], joinType=[INNER])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -63,13 +63,13 @@ LogicalProject(c=[$0], d=[$1])
          +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
             :- LogicalProject(a=[$0], b=[$1], c=[$2])
             :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-            +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
+            +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, d])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -84,13 +84,13 @@ LogicalProject(c=[$0], d=[$1])
          +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
             :- LogicalProject(a=[$0], b=[$1], c=[$2])
             :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-            +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
+            +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, d])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -101,13 +101,13 @@ Calc(select=[c, d])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -118,13 +118,13 @@ Calc(select=[c, s])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(TableFunc1(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -135,13 +135,13 @@ Calc(select=[c, s])
 LogicalProject(c=[$2], name=[$3], len=[$4])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*($2))], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*($2))], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, name, len])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*($2))], correlate=[table(TableFunc2(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*(c)))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*($2))], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*(c)))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -153,13 +153,13 @@ LogicalFilter(condition=[>($2, 2)])
 +- LogicalProject(c=[$2], name=[$3], len=[$4])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;])
+      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, name, len])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(TableFunc2(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -170,14 +170,13 @@ Calc(select=[c, name, len])
 LogicalProject(f0=[AS($3, _UTF-16LE'f0')], f1=[AS($4, _UTF-16LE'f1')])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(f1, f2, f3)]]])
-   +- LogicalProject(f0=[$0], f1_0=[$1])
-      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, INTEGER f1)])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) f0, INTEGER f1_0)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[f0, f10 AS f1])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(f3))], select=[f1,f2,f3,f0,f10], rowType=[RecordType(INTEGER f1, BIGINT f2, VARCHAR(2147483647) f3, VARCHAR(2147483647) f0, INTEGER f10)], joinType=[INNER])
+Calc(select=[f0, f1_0 AS f1])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(f3))], select=[f1,f2,f3,f0,f1_0], rowType=[RecordType(INTEGER f1, BIGINT f2, VARCHAR(2147483647) f3, VARCHAR(2147483647) f0, INTEGER f1_0)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(f1, f2, f3)]]], fields=[f1, f2, f3])
 ]]>
     </Resource>
@@ -209,12 +208,12 @@ Correlate(invocation=[str_split(_UTF-16LE'Jack,John')], correlate=[table(str_spl
       <![CDATA[
 LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Correlate(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], correlate=[table(HierarchyTableFunction(c))], select=[a,b,c,name,adult,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], joinType=[INNER])
+Correlate(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], correlate=[table(*org.apache.flink.table.planner.utils.HierarchyTableFunction*(c))], select=[a,b,c,name,adult,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], joinType=[INNER])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -291,13 +290,13 @@ Correlate(invocation=[str_split(_UTF-16LE'Jack,John', _UTF-16LE',')], correlate=
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -307,12 +306,12 @@ Calc(select=[c, s])
       <![CDATA[
 LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], correlate=[table(TableFunc1(SUBSTRING(c, 2)))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
+Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING(c, 2)))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -322,12 +321,12 @@ Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRIN
       <![CDATA[
 LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], rowType=[RecordType(INTEGER age, VARCHAR(2147483647) name)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Correlate(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], correlate=[table(PojoTableFunc(c))], select=[a,b,c,age,name], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER age, VARCHAR(2147483647) name)], joinType=[INNER])
+Correlate(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], correlate=[table(*org.apache.flink.table.planner.utils.PojoTableFunc*(c))], select=[a,b,c,name,age], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableFunctionJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableFunctionJoinTest.xml
index 0525902..47f118e 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableFunctionJoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableFunctionJoinTest.xml
@@ -25,7 +25,7 @@ LogicalJoin(condition=[=($3, $1)], joinType=[inner])
 :     +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :        :- LogicalProject(o_rowtime=[AS($0, _UTF-16LE'o_rowtime')], o_comment=[AS($1, _UTF-16LE'o_comment')], o_amount=[AS($2, _UTF-16LE'o_amount')], o_currency=[AS($3, _UTF-16LE'o_currency')], o_secondary_key=[AS($4, _UTF-16LE'o_secondary_key')])
 :        :  +- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
-:        +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$bbf912e58a3fb2d083552961c0f87dbe*($0)], rowType=[RecordType(TIMESTAMP(3) *ROWTIME* rowtime, VARCHAR(2147483647) comment, VARCHAR(2147483647) currency, INTEGER rate, INTEGER secondary_key)], elementType=[class [Ljava.lang.Object;])
+:        +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$bbf912e58a3fb2d083552961c0f87dbe*($0)], rowType=[RecordType(TIMESTAMP(3) *ROWTIME* rowtime, VARCHAR(2147483647) comment, VARCHAR(2147483647) currency, INTEGER rate, INTEGER secondary_key)])
 +- LogicalTableScan(table=[[default_catalog, default_database, ThirdTable]])
 ]]>
     </Resource>
@@ -53,7 +53,7 @@ LogicalProject(rate=[AS(*($0, $4), _UTF-16LE'rate')])
 +- LogicalFilter(condition=[=($3, $1)])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
       :- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
-      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$225833b9896cb7740aed9150b8cc7fd9*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)], elementType=[class [Ljava.lang.Object;])
+      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$225833b9896cb7740aed9150b8cc7fd9*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -74,7 +74,7 @@ LogicalProject(rate=[AS(*($0, $4), _UTF-16LE'rate')])
 +- LogicalFilter(condition=[=($3, $1)])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
       :- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
-      +- LogicalTableFunctionScan(invocation=[Rates($2)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)])
+      +- LogicalTableFunctionScan(invocation=[Rates($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -95,7 +95,7 @@ LogicalProject(rate=[AS(*($0, $4), _UTF-16LE'rate')])
 +- LogicalFilter(condition=[=($3, $1)])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
       :- LogicalTableScan(table=[[default_catalog, default_database, ProctimeOrders]])
-      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$170cc46c47df69784f267e43f61e8e9d*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP_LTZ(3) *PROCTIME* proctime)], elementType=[class [Ljava.lang.Object;])
+      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$170cc46c47df69784f267e43f61e8e9d*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP_LTZ(3) *PROCTIME* proctime)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -117,7 +117,7 @@ LogicalProject(rate=[AS(*($0, $4), _UTF-16LE'rate')])
 +- LogicalFilter(condition=[=($3, $1)])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
       :- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
-      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$225833b9896cb7740aed9150b8cc7fd9*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)], elementType=[class [Ljava.lang.Object;])
+      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$225833b9896cb7740aed9150b8cc7fd9*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
index ca28de3..cddc89d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableSourceTest.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.planner.plan.batch.sql
 
-import org.apache.flink.table.api.config.TableConfigOptions
 import org.apache.flink.table.planner.plan.optimize.RelNodeBlockPlanBuilder
 import org.apache.flink.table.planner.utils._
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala
index 14348ae..8ebaee6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala
@@ -34,8 +34,6 @@ class CorrelateTest extends TableTestBase {
     val util = batchTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val func = new TableFunc1
-    util.addFunction("func1", func)
-
     val result1 = table.joinLateral(func('c) as 's).select('c, 's)
 
     util.verifyExecPlan(result1)
@@ -46,8 +44,6 @@ class CorrelateTest extends TableTestBase {
     val util = batchTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val func = new TableFunc1
-    util.addFunction("func1", func)
-
     val result2 = table.joinLateral(func('c, "$") as 's).select('c, 's)
     util.verifyExecPlan(result2)
   }
@@ -57,8 +53,6 @@ class CorrelateTest extends TableTestBase {
     val util = batchTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val func = new TableFunc1
-    util.addFunction("func1", func)
-
     val result = table.leftOuterJoinLateral(func('c) as 's).select('c, 's).where('s > "")
     util.verifyExecPlan(result)
   }
@@ -68,8 +62,6 @@ class CorrelateTest extends TableTestBase {
     val util = batchTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val func = new TableFunc1
-    util.addFunction("func1", func)
-
     val result = table.leftOuterJoinLateral(func('c) as 's, true).select('c, 's)
     util.verifyExecPlan(result)
   }
@@ -79,8 +71,6 @@ class CorrelateTest extends TableTestBase {
     val util = batchTestUtil()
     val sourceTable = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val func = new TableFunc0
-    util.addFunction("func1", func)
-
     val result = sourceTable.select('a, 'b, 'c)
       .joinLateral(func('c) as('d, 'e))
       .select('c, 'd, 'e)
@@ -106,8 +96,6 @@ class CorrelateTest extends TableTestBase {
 
     val sourceTable = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val func = new TableFunc0
-    util.addFunction("func1", func)
-
     val result = sourceTable.select('a, 'b, 'c)
       .joinLateral(func('c) as('d, 'e))
       .select('c, 'd, 'e)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.scala
index 95b3405..a4aa185 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/CorrelateStringExpressionTest.scala
@@ -29,9 +29,7 @@ class CorrelateStringExpressionTest extends TableTestBase {
   private val util = batchTestUtil()
   private val tab = util.addTableSource[(Int, Long, String)]("Table1", 'a, 'b, 'c)
   private val func1 = new TableFunc1
- util.addFunction("func1", func1)
   private val func2 = new TableFunc2
- util.addFunction("func2", func2)
 
   @Test
   def testCorrelateJoins1(): Unit = {
@@ -61,7 +59,6 @@ class CorrelateStringExpressionTest extends TableTestBase {
   def testCorrelateJoins5(): Unit = {
     // test hierarchy generic type
     val hierarchy = new HierarchyTableFunction
-   util.addFunction("hierarchy", hierarchy)
     val scalaTable = tab.joinLateral(
       hierarchy('c) as('name, 'adult, 'len)).select('c, 'name, 'len, 'adult)
     util.verifyExecPlan(scalaTable)
@@ -71,7 +68,6 @@ class CorrelateStringExpressionTest extends TableTestBase {
   def testCorrelateJoins6(): Unit = {
     // test pojo type
     val pojo = new PojoTableFunc
-   util.addFunction("pojo", pojo)
     val scalaTable = tab.joinLateral(pojo('c)).select('c, 'name, 'age)
     util.verifyExecPlan(scalaTable)
   }
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CorrelateValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CorrelateValidationTest.scala
index d69383e..611928e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CorrelateValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CorrelateValidationTest.scala
@@ -36,7 +36,6 @@ class CorrelateValidationTest extends TableTestBase {
     val util = batchTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val func = new TableFunc1
-   util.addFunction("func1", func)
     val result = table
       .leftOuterJoinLateral(func('c) as 's, 'c === 's)
       .select('c, 's)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
index 339e7ca..ff1be68 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
@@ -36,8 +36,6 @@ class CorrelateTest extends TableTestBase {
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
     val function = new TableFunc1
-    util.addFunction("func1", function)
-
     val result1 = table.joinLateral(function('c) as 's).select('c, 's)
     util.verifyExecPlan(result1)
   }
@@ -48,7 +46,6 @@ class CorrelateTest extends TableTestBase {
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
     val function = new TableFunc1
-    util.addFunction("func1", function)
     // test overloading
     val result2 = table.joinLateral(function('c, "$") as 's).select('c, 's)
     util.verifyExecPlan(result2)
@@ -59,8 +56,6 @@ class CorrelateTest extends TableTestBase {
     val util = streamTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = new TableFunc1
-    util.addFunction("func1", function)
-
     val result = table.leftOuterJoinLateral(function('c) as 's, true).select('c, 's)
     util.verifyExecPlan(result)
   }
@@ -70,7 +65,6 @@ class CorrelateTest extends TableTestBase {
     val util = streamTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = new TableFunc2
-    util.addFunction("func2", function)
     val scalarFunc = new Func13("pre")
 
     val result = table.joinLateral(
@@ -84,8 +78,6 @@ class CorrelateTest extends TableTestBase {
     val util = streamTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = new HierarchyTableFunction
-    util.addFunction("hierarchy", function)
-
     val result = table.joinLateral(function('c) as ('name, 'adult, 'len))
     util.verifyExecPlan(result)
   }
@@ -95,8 +87,6 @@ class CorrelateTest extends TableTestBase {
     val util = streamTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = new PojoTableFunc
-    util.addFunction("pojo", function)
-
     val result = table.joinLateral(function('c))
     util.verifyExecPlan(result)
   }
@@ -106,8 +96,6 @@ class CorrelateTest extends TableTestBase {
     val util = streamTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = new TableFunc2
-    util.addFunction("func2", function)
-
     val result = table
       .joinLateral(function('c) as ('name, 'len))
       .select('c, 'name, 'len)
@@ -120,8 +108,6 @@ class CorrelateTest extends TableTestBase {
     val util = streamTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = new TableFunc1
-    util.addFunction("func1", function)
-
     val result = table.joinLateral(function('c.substring(2)) as 's)
     util.verifyExecPlan(result)
   }
@@ -131,8 +117,6 @@ class CorrelateTest extends TableTestBase {
     val util = streamTestUtil()
     val sourceTable = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = new TableFunc0
-    util.addFunction("func1", function)
-
     val result = sourceTable.select('a, 'b, 'c)
       .joinLateral(function('c) as('d, 'e))
       .select('c, 'd, 'e)
@@ -158,7 +142,6 @@ class CorrelateTest extends TableTestBase {
 
     val sourceTable = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = new TableFunc0
-    util.addFunction("func1", function)
     val result = sourceTable.select('a, 'b, 'c)
       .joinLateral(function('c) as('d, 'e))
       .select('c, 'd, 'e)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/CorrelateValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/CorrelateValidationTest.scala
index ec2401f..a991cc5 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/CorrelateValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/CorrelateValidationTest.scala
@@ -68,11 +68,10 @@ class CorrelateValidationTest extends TableTestBase {
     util.addFunction("func0", Func0)
 
     // SQL API call
-    // NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug
     expectExceptionThrown(
       util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
       null,
-      classOf[AssertionError])
+      classOf[ValidationException])
 
     //========== throw exception when the parameters is not correct ===============
     // Java Table API call
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala
index 17a35c2..a125f93 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala
@@ -222,7 +222,8 @@ class CorrelateITCase extends BatchTestBase {
       'a.cast(DataTypes.TINYINT) as 'a,
       'a.cast(DataTypes.SMALLINT) as 'b,
       'b.cast(DataTypes.FLOAT) as 'c)
-        .joinLateral(tFunc('a, 'b, 'c) as ('a2, 'b2, 'c2))
+        .joinLateral(
+          tFunc('a.ifNull(0.toByte), 'b.ifNull(0.toShort), 'c.ifNull(0.toFloat)) as ('a2, 'b2, 'c2))
 
     val results = executeQuery(result)
     val expected = Seq(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
index 9f106c4..683543b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
 import org.junit.rules.{ExpectedException, TemporaryFolder}
 import org.junit.{After, Before, Rule}
 
@@ -44,6 +45,7 @@ class StreamingTestBase extends AbstractTestBase {
   def tempFolder: TemporaryFolder = _tempFolder
 
   @Before
+  @BeforeEach
   def before(): Unit = {
     this.env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setParallelism(4)
@@ -55,6 +57,7 @@ class StreamingTestBase extends AbstractTestBase {
   }
 
   @After
+  @AfterEach
   def after(): Unit = {
     StreamTestSink.clear()
     TestValuesTableFactory.clearAllData()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
index f087d5a..6fcad3d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
@@ -29,8 +29,6 @@ import org.apache.flink.types.Row
 
 import org.junit.Assert
 
-import java.lang.Boolean
-
 import scala.annotation.varargs
 
 
@@ -117,9 +115,10 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[
 }
 
 @SerialVersionUID(1L)
+@DataTypeHint("ROW<x INT, y INT>")
 class MockPythonTableFunction extends TableFunction[Row] with PythonFunction {
 
-  def eval(x: Int, y: Int) = ???
+  def eval(x: java.lang.Integer, y: java.lang.Integer) = ???
 
   override def getResultType: TypeInformation[Row] =
     new RowTypeInfo(Types.INT, Types.INT)
@@ -368,28 +367,19 @@ class MockPythonTableFunction extends TableFunction[Row] with PythonFunction {
 //}
 
 @SerialVersionUID(1L)
+@DataTypeHint("ROW<f0 STRING, f1 STRING, f2 STRING>")
 class TableFunc4 extends TableFunction[Row] {
   def eval(b: Byte, s: Short, f: Float): Unit = {
     collect(Row.of("Byte=" + b, "Short=" + s, "Float=" + f))
   }
-
-  override def getResultType: TypeInformation[Row] = {
-    new RowTypeInfo(Types.STRING, Types.STRING, Types.STRING)
-  }
 }
 
 @SerialVersionUID(1L)
+@DataTypeHint("ROW<a INT, b INT, c INT>")
 class TableFunc6 extends TableFunction[Row] {
-  def eval(row: Row): Unit = {
+  def eval(@DataTypeHint("ROW<a INT, b INT, c INT>") row: Row): Unit = {
     collect(row)
   }
-
-  override def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] =
-    Array(new RowTypeInfo(Types.INT, Types.INT, Types.INT))
-
-  override def getResultType: TypeInformation[Row] = {
-    new RowTypeInfo(Types.INT, Types.INT, Types.INT)
-  }
 }
 
 @SerialVersionUID(1L)
@@ -421,12 +411,12 @@ class VarArgsFunc0 extends TableFunction[String] {
 }
 
 @SerialVersionUID(1L)
-class HierarchyTableFunction extends SplittableTableFunction[Boolean, Integer] {
+class HierarchyTableFunction extends SplittableTableFunction[java.lang.Boolean, Integer] {
   def eval(user: String) {
     if (user.contains("#")) {
       val splits = user.split("#")
       val age = splits(1).toInt
-      collect(new Tuple3[String, Boolean, Integer](splits(0), age >= 20, age))
+      collect(new Tuple3[String, java.lang.Boolean, Integer](splits(0), age >= 20, age))
     }
   }
 }