You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/03/23 06:28:15 UTC

[flink] branch master updated (24f0ac2 -> e42202b)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 24f0ac2  [hotfix][python][docs] Update documentation on how to use with_configuration in EnvironmentSettings
     new 3a2b8da  [hotfix][table-planner] Deprecate SqlFunctions of old function stack
     new f8cb19e  [FLINK-26518][table-planner] Port FlinkRelBuilder to Java
     new e42202b  [FLINK-26518][table] Support BridgingSqlFunction with SqlTableFunction for Scala implicits

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../operations/utils/CalculatedTableFactory.java   |   6 +-
 .../table/api/ImplicitExpressionConversions.scala  |   8 +-
 .../calcite/sql/validate/ProcedureNamespace.java   |  35 ++-
 .../table/planner/calcite/FlinkRelBuilder.java     | 294 +++++++++++++++++++++
 .../catalog/QueryOperationCatalogViewTable.java    |   8 +-
 .../table/planner/delegation/PlannerContext.java   |   2 +-
 .../functions/bridging/BridgingSqlFunction.java    |  39 ++-
 .../planner/plan/QueryOperationConverter.java      |  12 +-
 .../plan/rules/logical/SubQueryDecorrelator.java   |   3 +-
 .../BatchPhysicalPythonWindowAggregateRule.java    |   3 +-
 ...reamPhysicalPythonGroupWindowAggregateRule.java |   2 +-
 .../table/planner/calcite/FlinkRelBuilder.scala    | 233 ----------------
 .../planner/expressions/fieldExpression.scala      |   7 -
 .../planner/expressions/windowProperties.scala     |   5 -
 .../functions/utils/ScalarSqlFunction.scala        |   5 +-
 .../nodes/calcite/LogicalWindowAggregate.scala     |   6 +-
 .../calcite/LogicalWindowTableAggregate.scala      |   4 +-
 .../plan/nodes/calcite/WindowAggregate.scala       |  11 +-
 .../plan/nodes/calcite/WindowTableAggregate.scala  |  10 +-
 .../logical/FlinkLogicalWindowAggregate.scala      |   2 +-
 .../logical/FlinkLogicalWindowTableAggregate.scala |   2 +-
 .../rules/logical/CorrelateSortToRankRule.scala    |   2 +-
 .../schema/DeferredTypeFlinkTableFunction.scala    |   6 +-
 .../plan/schema/TypedFlinkTableFunction.scala      |   6 +-
 .../planner/plan/utils/SetOpRewriteUtil.scala      |  12 +-
 .../planner/runtime/stream/sql/FunctionITCase.java |   6 +-
 .../runtime/stream/table/FunctionITCase.java       |  39 ++-
 .../utils/JavaUserDefinedTableFunctions.java       |   6 +-
 .../planner/plan/batch/sql/SetOperatorsTest.xml    |   8 +-
 .../planner/plan/batch/table/CorrelateTest.xml     |  28 +-
 .../planner/plan/batch/table/SetOperatorsTest.xml  |   8 +-
 .../planner/plan/common/PartialInsertTest.xml      |  16 +-
 .../rules/logical/RewriteIntersectAllRuleTest.xml  |  12 +-
 .../plan/rules/logical/RewriteMinusAllRuleTest.xml |  12 +-
 .../planner/plan/stream/sql/SetOperatorsTest.xml   |   8 +-
 .../plan/stream/table/ColumnFunctionsTest.xml      |   4 +-
 .../planner/plan/stream/table/CorrelateTest.xml    |  51 ++--
 .../stream/table/TemporalTableFunctionJoinTest.xml |  10 +-
 .../planner/plan/batch/table/CorrelateTest.scala   |  12 -
 .../table/validation/CorrelateValidationTest.scala |   1 -
 .../planner/plan/stream/table/CorrelateTest.scala  |  17 --
 .../table/validation/CorrelateValidationTest.scala |   3 +-
 .../runtime/batch/table/CorrelateITCase.scala      |   3 +-
 .../planner/runtime/utils/StreamingTestBase.scala  |   3 +
 .../planner/utils/UserDefinedTableFunctions.scala  |  24 +-
 45 files changed, 524 insertions(+), 470 deletions(-)
 create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
 delete mode 100644 flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala

[flink] 03/03: [FLINK-26518][table] Support BridgingSqlFunction with SqlTableFunction for Scala implicits

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e42202be9f80d49eb70a24125ed3269ab9fb1110
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Mar 9 15:38:52 2022 +0100

    [FLINK-26518][table] Support BridgingSqlFunction with SqlTableFunction for Scala implicits
    
    This closes #19137.
---
 .../operations/utils/CalculatedTableFactory.java   |  6 ++-
 .../table/api/ImplicitExpressionConversions.scala  |  8 +--
 .../calcite/sql/validate/ProcedureNamespace.java   | 35 ++++++-------
 .../table/planner/calcite/FlinkRelBuilder.java     | 59 ++++++++++++++++++++++
 .../functions/bridging/BridgingSqlFunction.java    | 39 +++++++++++++-
 .../planner/plan/QueryOperationConverter.java      | 12 +++--
 .../planner/plan/utils/SetOpRewriteUtil.scala      | 12 +++--
 .../planner/runtime/stream/sql/FunctionITCase.java |  6 +--
 .../runtime/stream/table/FunctionITCase.java       | 39 +++++++-------
 .../utils/JavaUserDefinedTableFunctions.java       |  6 ++-
 .../planner/plan/batch/sql/SetOperatorsTest.xml    |  8 +--
 .../planner/plan/batch/table/CorrelateTest.xml     | 28 +++++-----
 .../planner/plan/batch/table/SetOperatorsTest.xml  |  8 +--
 .../planner/plan/common/PartialInsertTest.xml      | 16 +++---
 .../rules/logical/RewriteIntersectAllRuleTest.xml  | 12 ++---
 .../plan/rules/logical/RewriteMinusAllRuleTest.xml | 12 ++---
 .../planner/plan/stream/sql/SetOperatorsTest.xml   |  8 +--
 .../plan/stream/table/ColumnFunctionsTest.xml      |  4 +-
 .../planner/plan/stream/table/CorrelateTest.xml    | 51 +++++++++----------
 .../stream/table/TemporalTableFunctionJoinTest.xml | 10 ++--
 .../planner/plan/batch/table/CorrelateTest.scala   | 12 -----
 .../table/validation/CorrelateValidationTest.scala |  1 -
 .../planner/plan/stream/table/CorrelateTest.scala  | 17 -------
 .../table/validation/CorrelateValidationTest.scala |  3 +-
 .../runtime/batch/table/CorrelateITCase.scala      |  3 +-
 .../planner/runtime/utils/StreamingTestBase.scala  |  3 ++
 .../planner/utils/UserDefinedTableFunctions.scala  | 24 +++------
 27 files changed, 246 insertions(+), 196 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/CalculatedTableFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/CalculatedTableFactory.java
index 09b9eca..6d20abd 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/CalculatedTableFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/CalculatedTableFactory.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.expressions.ExpressionUtils;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.expressions.utils.ResolvedExpressionDefaultVisitor;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.FunctionKind;
 import org.apache.flink.table.operations.CalculatedQueryOperation;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.types.DataType;
@@ -38,6 +39,7 @@ import java.util.Collections;
 import java.util.List;
 
 import static java.util.stream.Collectors.toList;
+import static org.apache.flink.table.expressions.ApiExpressionUtils.isFunctionOfKind;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AS;
 
 /** Utility class for creating a valid {@link CalculatedQueryOperation} operation. */
@@ -89,7 +91,7 @@ final class CalculatedTableFactory {
                                                                                     + alias)))
                             .collect(toList());
 
-            if (!(children.get(0) instanceof CallExpression)) {
+            if (!isFunctionOfKind(children.get(0), FunctionKind.TABLE)) {
                 throw fail();
             }
 
@@ -156,7 +158,7 @@ final class CalculatedTableFactory {
 
         private ValidationException fail() {
             return new ValidationException(
-                    "A lateral join only accepts a string expression which defines a table function "
+                    "A lateral join only accepts an expression which defines a table function "
                             + "call that might be followed by some alias.");
         }
     }
diff --git a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
index b2c0ca2..c36c3d1 100644
--- a/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
+++ b/flink-table/flink-table-api-scala/src/main/scala/org/apache/flink/table/api/ImplicitExpressionConversions.scala
@@ -158,17 +158,13 @@ trait ImplicitExpressionConversions {
     }
   }
 
-  implicit class TableFunctionCall[T: TypeInformation](val t: TableFunction[T]) {
+  implicit class TableFunctionCall(val t: TableFunction[_]) {
 
     /**
       * Calls a table function for the given parameters.
       */
     def apply(params: Expression*): Expression = {
-      val resultTypeInfo: TypeInformation[T] = UserDefinedFunctionHelper
-        .getReturnTypeOfTableFunction(t, implicitly[TypeInformation[T]])
-      unresolvedCall(
-        new TableFunctionDefinition(t.getClass.getName, t, resultTypeInfo),
-        params.map(ApiExpressionUtils.objectToExpression): _*)
+      unresolvedCall(t, params.map(ApiExpressionUtils.objectToExpression): _*)
     }
   }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java
index cf9beec..22e9380 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java
@@ -21,13 +21,12 @@ import org.apache.flink.annotation.Internal;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlCallBinding;
-import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlTableFunction;
-import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.type.SqlReturnTypeInference;
-import org.apache.calcite.sql.type.SqlTypeName;
+
+import static java.util.Objects.requireNonNull;
 
 /**
  * Namespace whose contents are defined by the result of a call to a user-defined procedure.
@@ -56,25 +55,23 @@ public final class ProcedureNamespace extends AbstractNamespace {
 
     public RelDataType validateImpl(RelDataType targetRowType) {
         validator.inferUnknownTypes(validator.unknownType, scope, call);
-        final RelDataType type = validator.deriveTypeImpl(scope, call);
+        // The result is ignored but the type is derived to trigger the validation
+        validator.deriveTypeImpl(scope, call);
         final SqlOperator operator = call.getOperator();
         final SqlCallBinding callBinding = new SqlCallBinding(validator, scope, call);
-        if (operator instanceof SqlTableFunction) {
-            final SqlTableFunction tableFunction = (SqlTableFunction) operator;
-            if (type.getSqlTypeName() != SqlTypeName.CURSOR) {
-                throw new IllegalArgumentException(
-                        "Table function should have CURSOR " + "type, not " + type);
-            }
-            final SqlReturnTypeInference rowTypeInference = tableFunction.getRowTypeInference();
-            RelDataType retType = rowTypeInference.inferReturnType(callBinding);
-            return validator.getTypeFactory().createTypeWithNullability(retType, false);
-        }
-
-        // special handling of collection tables TABLE(function(...))
-        if (SqlUtil.stripAs(enclosingNode).getKind() == SqlKind.COLLECTION_TABLE) {
-            return toStruct(type, getNode());
+        if (!(operator instanceof SqlTableFunction)) {
+            throw new IllegalArgumentException(
+                    "Argument must be a table function: " + operator.getNameAsId());
         }
-        return type;
+        final SqlTableFunction tableFunction = (SqlTableFunction) operator;
+        final SqlReturnTypeInference rowTypeInference = tableFunction.getRowTypeInference();
+        final RelDataType rowRelDataType =
+                requireNonNull(
+                        rowTypeInference.inferReturnType(callBinding),
+                        () -> "got null from inferReturnType for call " + callBinding.getCall());
+        // For BridgingSqlFunction the type can still be atomic
+        // and will be wrapped with a proper field alias
+        return toStruct(rowRelDataType, getNode());
     }
 
     /** Converts a type to a struct if it is not already. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
index 35ab473..c5b774d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.planner.calcite.FlinkRelFactories.ExpandFactory;
 import org.apache.flink.table.planner.calcite.FlinkRelFactories.RankFactory;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
 import org.apache.flink.table.planner.hint.FlinkHints;
 import org.apache.flink.table.planner.plan.QueryOperationConverter;
 import org.apache.flink.table.planner.plan.logical.LogicalWindow;
@@ -33,6 +34,7 @@ import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowTableAggre
 import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
 import org.apache.flink.table.runtime.operators.rank.RankRange;
 import org.apache.flink.table.runtime.operators.rank.RankType;
+import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
@@ -40,6 +42,7 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptSchema;
 import org.apache.calcite.plan.RelOptTable.ToRelContext;
 import org.apache.calcite.plan.ViewExpanders;
@@ -48,18 +51,26 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.tools.RelBuilderFactory;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.table.planner.plan.utils.AggregateUtil.isTableAggregate;
 
@@ -105,6 +116,54 @@ public final class FlinkRelBuilder extends RelBuilder {
         };
     }
 
+    /**
+     * {@link RelBuilder#functionScan(SqlOperator, int, Iterable)} cannot work smoothly with aliases
+     * which is why we implement a custom one. The method is static because some {@link RelOptRule}s
+     * don't use {@link FlinkRelBuilder}.
+     */
+    public static RelBuilder pushFunctionScan(
+            RelBuilder relBuilder,
+            SqlOperator operator,
+            int inputCount,
+            Iterable<RexNode> operands,
+            List<String> aliases) {
+        Preconditions.checkArgument(
+                operator instanceof BridgingSqlFunction.WithTableFunction,
+                "Table function expected.");
+        final RexBuilder rexBuilder = relBuilder.getRexBuilder();
+        final RelDataTypeFactory typeFactory = relBuilder.getTypeFactory();
+
+        final List<RelNode> inputs = new LinkedList<>();
+        for (int i = 0; i < inputCount; i++) {
+            inputs.add(0, relBuilder.build());
+        }
+
+        final List<RexNode> operandList = CollectionUtil.iterableToList(operands);
+
+        final RelDataType functionRelDataType = rexBuilder.deriveReturnType(operator, operandList);
+        final List<RelDataType> fieldRelDataTypes;
+        if (functionRelDataType.isStruct()) {
+            fieldRelDataTypes =
+                    functionRelDataType.getFieldList().stream()
+                            .map(RelDataTypeField::getType)
+                            .collect(Collectors.toList());
+        } else {
+            fieldRelDataTypes = Collections.singletonList(functionRelDataType);
+        }
+        final RelDataType rowRelDataType = typeFactory.createStructType(fieldRelDataTypes, aliases);
+
+        final RexNode call = rexBuilder.makeCall(rowRelDataType, operator, operandList);
+        final RelNode functionScan =
+                LogicalTableFunctionScan.create(
+                        relBuilder.getCluster(),
+                        inputs,
+                        call,
+                        null,
+                        rowRelDataType,
+                        Collections.emptySet());
+        return relBuilder.push(functionScan);
+    }
+
     public RelBuilder expand(List<List<RexNode>> projects, int expandIdIndex) {
         final RelNode input = build();
         final RelNode expand = expandFactory.createExpand(input, projects, expandIdIndex);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
index 5dbe4d8..d661787 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionIdentifier;
 import org.apache.flink.table.functions.FunctionKind;
 import org.apache.flink.table.planner.calcite.FlinkContext;
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
 import org.apache.flink.table.types.DataType;
@@ -35,6 +36,10 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlTableFunction;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.calcite.tools.RelBuilder;
 
 import java.util.List;
 
@@ -52,7 +57,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * (either a system or user-defined function).
  */
 @Internal
-public final class BridgingSqlFunction extends SqlFunction {
+public class BridgingSqlFunction extends SqlFunction {
 
     private final DataTypeFactory dataTypeFactory;
 
@@ -108,6 +113,10 @@ public final class BridgingSqlFunction extends SqlFunction {
                 functionKind == FunctionKind.SCALAR || functionKind == FunctionKind.TABLE,
                 "Scalar or table function kind expected.");
 
+        if (functionKind == FunctionKind.TABLE) {
+            return new BridgingSqlFunction.WithTableFunction(
+                    dataTypeFactory, typeFactory, kind, resolvedFunction, typeInference);
+        }
         return new BridgingSqlFunction(
                 dataTypeFactory, typeFactory, kind, resolvedFunction, typeInference);
     }
@@ -177,4 +186,32 @@ public final class BridgingSqlFunction extends SqlFunction {
     public boolean isDeterministic() {
         return resolvedFunction.getDefinition().isDeterministic();
     }
+
+    // --------------------------------------------------------------------------------------------
+    // Table function extension
+    // --------------------------------------------------------------------------------------------
+
+    /** Special flavor of {@link BridgingSqlFunction} to indicate a table function to Calcite. */
+    public static class WithTableFunction extends BridgingSqlFunction implements SqlTableFunction {
+
+        private WithTableFunction(
+                DataTypeFactory dataTypeFactory,
+                FlinkTypeFactory typeFactory,
+                SqlKind kind,
+                ContextResolvedFunction resolvedFunction,
+                TypeInference typeInference) {
+            super(dataTypeFactory, typeFactory, kind, resolvedFunction, typeInference);
+        }
+
+        /**
+         * The conversion to a row type is handled on the caller side. This allows us to perform it
+         * SQL/Table API-specific. This is in particular important to set the aliases of fields
+         * correctly (see {@link FlinkRelBuilder#pushFunctionScan(RelBuilder, SqlOperator, int,
+         * Iterable, List)}).
+         */
+        @Override
+        public SqlReturnTypeInference getRowTypeInference() {
+            return getReturnTypeInference();
+        }
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
index 837b3b2..3fcf62fa 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
@@ -305,10 +305,14 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod
             final BridgingSqlFunction sqlFunction =
                     BridgingSqlFunction.of(relBuilder.getCluster(), resolvedFunction);
 
-            return relBuilder
-                    .functionScan(sqlFunction, 0, parameters)
-                    .rename(calculatedTable.getResolvedSchema().getColumnNames())
-                    .build();
+            FlinkRelBuilder.pushFunctionScan(
+                    relBuilder,
+                    sqlFunction,
+                    0,
+                    parameters,
+                    calculatedTable.getResolvedSchema().getColumnNames());
+
+            return relBuilder.build();
         }
 
         private RelNode convertLegacyTableFunction(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/SetOpRewriteUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/SetOpRewriteUtil.scala
index eb6d422..32e400f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/SetOpRewriteUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/SetOpRewriteUtil.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.utils
 
 
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
 
 import org.apache.calcite.plan.RelOptUtil
@@ -73,12 +74,15 @@ object SetOpRewriteUtil {
     val cluster = relBuilder.getCluster
 
     val sqlFunction = BridgingSqlFunction.of(
-      relBuilder.getCluster,
+      cluster,
       BuiltInFunctionDefinitions.INTERNAL_REPLICATE_ROWS)
 
-    relBuilder
-      .functionScan(sqlFunction, 0, relBuilder.fields(Util.range(fields.size() + 1)))
-      .rename(outputRelDataType.getFieldNames)
+    FlinkRelBuilder.pushFunctionScan(
+      relBuilder,
+      sqlFunction,
+      0,
+      relBuilder.fields(Util.range(fields.size() + 1)),
+      outputRelDataType.getFieldNames)
 
     // correlated join
     val corSet = Collections.singleton(cluster.createCorrel())
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index 26553c9..a241cbe 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -846,8 +846,7 @@ public class FunctionITCase extends StreamingTestBase {
                                 tEnv().explainSql(
                                                 "INSERT INTO SinkTable "
                                                         + "SELECT * FROM TABLE(MD5('3'))"))
-                .hasMessageContaining(
-                        "Currently, only table functions can be used in a correlate operation.");
+                .hasMessageContaining("Argument must be a table function: MD5");
     }
 
     @Test
@@ -864,8 +863,7 @@ public class FunctionITCase extends StreamingTestBase {
                                 tEnv().explainSql(
                                                 "INSERT INTO SinkTable "
                                                         + "SELECT RowTableFunction('test')"))
-                .hasMessageContaining(
-                        "Currently, only scalar functions can be used in a projection or filter operation.");
+                .hasMessageContaining("Cannot call table function here: 'RowTableFunction'");
     }
 
     @Test
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java
index c792b6c..8096e1d 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/FunctionITCase.java
@@ -28,26 +28,22 @@ import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory
 import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.api.Expressions.call;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for user defined functions in the Table API. */
 public class FunctionITCase extends StreamingTestBase {
 
-    @Rule public ExpectedException thrown = ExpectedException.none();
-
     @Test
-    public void testScalarFunction() throws Exception {
+    void testScalarFunction() throws Exception {
         final List<Row> sourceData =
                 Arrays.asList(Row.of(1, 1L, 1L), Row.of(2, 2L, 1L), Row.of(3, 3L, 1L));
 
@@ -60,7 +56,7 @@ public class FunctionITCase extends StreamingTestBase {
         tEnv().executeSql(
                         "CREATE TABLE TestTable(a INT, b BIGINT, c BIGINT) WITH ('connector' = 'COLLECTION')");
 
-        Table table =
+        final Table table =
                 tEnv().from("TestTable")
                         .select(
                                 $("a"),
@@ -74,7 +70,7 @@ public class FunctionITCase extends StreamingTestBase {
     }
 
     @Test
-    public void testJoinWithTableFunction() throws Exception {
+    void testJoinWithTableFunction() throws Exception {
         final List<Row> sourceData =
                 Arrays.asList(
                         Row.of("1,2,3"), Row.of("2,3,4"), Row.of("3,4,5"), Row.of((String) null));
@@ -102,23 +98,22 @@ public class FunctionITCase extends StreamingTestBase {
     }
 
     @Test
-    public void testLateralJoinWithScalarFunction() throws Exception {
-        thrown.expect(ValidationException.class);
-        thrown.expect(
-                hasMessage(
-                        containsString(
-                                "Currently, only table functions can be used in a correlate operation.")));
-
+    void testLateralJoinWithScalarFunction() throws Exception {
         TestCollectionTableFactory.reset();
         tEnv().executeSql("CREATE TABLE SourceTable(s STRING) WITH ('connector' = 'COLLECTION')");
         tEnv().executeSql(
                         "CREATE TABLE SinkTable(s STRING, sa ARRAY<STRING>) WITH ('connector' = 'COLLECTION')");
 
-        tEnv().from("SourceTable")
-                .joinLateral(call(new RowScalarFunction(), $("s")).as("a", "b"))
-                .select($("a"), $("b"))
-                .executeInsert("SinkTable")
-                .await();
+        assertThatThrownBy(
+                        () -> {
+                            tEnv().from("SourceTable")
+                                    .joinLateral(
+                                            call(new RowScalarFunction(), $("s")).as("a", "b"));
+                        })
+                .satisfies(
+                        anyCauseMatches(
+                                ValidationException.class,
+                                "A lateral join only accepts an expression which defines a table function"));
     }
 
     // --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
index 95f6e51..9d7d559 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.runtime.utils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.functions.TableFunction;
 
@@ -34,7 +35,10 @@ public class JavaUserDefinedTableFunctions {
 
     /** Emit inputs as long. */
     public static class JavaTableFunc0 extends TableFunction<Long> {
-        public void eval(Integer a, Long b, TimestampData c) {
+        public void eval(
+                @DataTypeHint("DATE") Integer a,
+                Long b,
+                @DataTypeHint("TIMESTAMP(0)") TimestampData c) {
             collect(a.longValue());
             collect(b);
             collect(c.getMillisecond());
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml
index d34b6e8..7b89148 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SetOperatorsTest.xml
@@ -57,8 +57,8 @@ LogicalIntersect(all=[true])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[f0 AS c])
-+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1($f0,c))], select=[$f0,c,f0], rowType=[RecordType(BIGINT $f0, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[c0 AS c])
++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1($f0,c))], select=[$f0,c,c0], rowType=[RecordType(BIGINT $f0, VARCHAR(2147483647) c, VARCHAR(2147483647) c0)], joinType=[INNER])
    +- Calc(select=[IF((vcol_left_cnt > vcol_right_cnt), vcol_right_cnt, vcol_left_cnt) AS $f0, c], where=[((vcol_left_cnt >= 1) AND (vcol_right_cnt >= 1))])
       +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_COUNT(count$0) AS vcol_left_cnt, Final_COUNT(count$1) AS vcol_right_cnt])
          +- Exchange(distribution=[hash[c]])
@@ -180,8 +180,8 @@ LogicalMinus(all=[true])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[f0 AS c])
-+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,c))], select=[sum_vcol_marker,c,f0], rowType=[RecordType(BIGINT sum_vcol_marker, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[c0 AS c])
++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,c))], select=[sum_vcol_marker,c,c0], rowType=[RecordType(BIGINT sum_vcol_marker, VARCHAR(2147483647) c, VARCHAR(2147483647) c0)], joinType=[INNER])
    +- Calc(select=[sum_vcol_marker, c], where=[(sum_vcol_marker > 0)])
       +- HashAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS sum_vcol_marker])
          +- Exchange(distribution=[hash[c]])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml
index d6347f7..a2d90a0 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.xml
@@ -26,13 +26,13 @@ LogicalProject(c=[$0], d=[$1])
          +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
             :- LogicalProject(a=[$0], b=[$1], c=[$2])
             :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-            +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
+            +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, d])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -42,12 +42,12 @@ Calc(select=[c, d])
       <![CDATA[
 LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], rowType=[RecordType(INTEGER x, INTEGER y)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], rowType=[RecordType(INTEGER x, INTEGER y)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-PythonCorrelate(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], correlate=[table(MockPythonTableFunction(a,b))], select=[a,b,c,x,y], rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, INTEGER y)], joinType=[INNER])
+PythonCorrelate(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], correlate=[table(*org.apache.flink.table.planner.utils.MockPythonTableFunction*(a,b))], select=[a,b,c,x,y], rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, INTEGER y)], joinType=[INNER])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -62,13 +62,13 @@ LogicalProject(c=[$0], d=[$1])
          +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
             :- LogicalProject(a=[$0], b=[$1], c=[$2])
             :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-            +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
+            +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, d])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -79,13 +79,13 @@ Calc(select=[c, d])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -96,13 +96,13 @@ Calc(select=[c, s])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(TableFunc1(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -114,13 +114,13 @@ LogicalFilter(condition=[>($1, _UTF-16LE'')])
 +- LogicalProject(c=[$2], s=[$3])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, s], where=[(s > '')])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -131,13 +131,13 @@ Calc(select=[c, s], where=[(s > '')])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml
index 598ee0e..d31bda9 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/SetOperatorsTest.xml
@@ -33,8 +33,8 @@ Calc(select=[EXPR$0 AS a, b, EXPR$1 AS c])
 +- HashAggregate(isMerge=[true], groupBy=[b], select=[b, Final_SUM(sum$0) AS EXPR$0, Final_COUNT(count$1) AS EXPR$1])
    +- Exchange(distribution=[hash[b]])
       +- LocalHashAggregate(groupBy=[b], select=[b, Partial_SUM(a) AS sum$0, Partial_COUNT(c) AS count$1])
-         +- Calc(select=[f0 AS a, f1 AS b, f2 AS c])
-            +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,b,c))], select=[sum_vcol_marker,a,b,c,f0,f1,f2], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER f0, BIGINT f1, VARCHAR(2147483647) f2)], joinType=[INNER])
+         +- Calc(select=[a0 AS a, b0 AS b, c0 AS c])
+            +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,b,c))], select=[sum_vcol_marker,a,b,c,a0,b0,c0], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER a0, BIGINT b0, VARCHAR(2147483647) c0)], joinType=[INNER])
                +- Calc(select=[sum_vcol_marker, a, b, c], where=[(sum_vcol_marker > 0)])
                   +- HashAggregate(isMerge=[true], groupBy=[a, b, c], select=[a, b, c, Final_SUM(sum$0) AS sum_vcol_marker])
                      +- Exchange(distribution=[hash[a, b, c]])
@@ -123,8 +123,8 @@ LogicalProject(b=[$1], c=[$2])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[f0 AS b, f1 AS c])
-+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,b,c))], select=[sum_vcol_marker,b,c,f0,f1], rowType=[RecordType(BIGINT sum_vcol_marker, BIGINT b, VARCHAR(2147483647) c, BIGINT f0, VARCHAR(2147483647) f1)], joinType=[INNER])
+Calc(select=[b0 AS b, c0 AS c])
++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,b,c))], select=[sum_vcol_marker,b,c,b0,c0], rowType=[RecordType(BIGINT sum_vcol_marker, BIGINT b, VARCHAR(2147483647) c, BIGINT b0, VARCHAR(2147483647) c0)], joinType=[INNER])
    +- Calc(select=[sum_vcol_marker, b, c], where=[(sum_vcol_marker > 0)])
       +- HashAggregate(isMerge=[true], groupBy=[b, c], select=[b, c, Final_SUM(sum$0) AS sum_vcol_marker])
          +- Exchange(distribution=[hash[b, c]])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
index 067ed8e..1ede5ab 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/common/PartialInsertTest.xml
@@ -127,8 +127,8 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], fields=[a
     <Resource name="optimized rel plan">
       <![CDATA[
 Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d, e, f, g])
-+- Calc(select=[f0 AS a, f1 AS c, f2 AS d, f3 AS e, f4 AS f, f5 AS g])
-   +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,c,d,e,f,g))], select=[sum_vcol_marker,a,c,d,e,f,g,f0,f1,f2,f3,f4,f5], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, DOUBLE f3, BIGINT f4, INTEGER f5)], joinType=[INNER])
++- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+   +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,c,d,e,f,g))], select=[sum_vcol_marker,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT f0, INTEGER g0)], joinType=[INNER])
       +- Calc(select=[sum_vcol_marker, a, c, d, e, f, g], where=[>(sum_vcol_marker, 0)])
          +- GroupAggregate(groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f, g, SUM_RETRACT(vcol_marker) AS sum_vcol_marker])
             +- Exchange(distribution=[hash[a, c, d, e, f, g]])
@@ -165,8 +165,8 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], fields=[a
       <![CDATA[
 Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d, e, f, g])
 +- Sort(orderBy=[c ASC, d ASC])
-   +- Calc(select=[f0 AS a, f1 AS c, f2 AS d, f3 AS e, f4 AS f, f5 AS g])
-      +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,c,d,e,f,g))], select=[sum_vcol_marker,a,c,d,e,f,g,f0,f1,f2,f3,f4,f5], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, DOUBLE f3, BIGINT f4, INTEGER f5)], joinType=[INNER])
+   +- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+      +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,a,c,d,e,f,g))], select=[sum_vcol_marker,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT sum_vcol_marker, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT f0, INTEGER g0)], joinType=[INNER])
          +- Calc(select=[sum_vcol_marker, a, c, d, e, f, g], where=[>(sum_vcol_marker, 0)])
             +- HashAggregate(isMerge=[true], groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f, g, Final_SUM(sum$0) AS sum_vcol_marker])
                +- Exchange(distribution=[hash[a, c, d, e, f, g]])
@@ -205,8 +205,8 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], fields=[a
     <Resource name="optimized rel plan">
       <![CDATA[
 Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d, e, f, g])
-+- Calc(select=[f0 AS a, f1 AS c, f2 AS d, f3 AS e, f4 AS f, f5 AS g])
-   +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1($f0,a,c,d,e,f,g))], select=[$f0,a,c,d,e,f,g,f0,f1,f2,f3,f4,f5], rowType=[RecordType(BIGINT $f0, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, DOUBLE f3, BIGINT f4, INTEGER f5)], joinType=[INNER])
++- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+   +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1($f0,a,c,d,e,f,g))], select=[$f0,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT $f0, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT f0, INTEGER g0)], joinType=[INNER])
       +- Calc(select=[IF(>(vcol_left_cnt, vcol_right_cnt), vcol_right_cnt, vcol_left_cnt) AS $f0, a, c, d, e, f, g], where=[AND(>=(vcol_left_cnt, 1), >=(vcol_right_cnt, 1))])
          +- GroupAggregate(groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f, g, COUNT_RETRACT(vcol_left_marker) AS vcol_left_cnt, COUNT_RETRACT(vcol_right_marker) AS vcol_right_cnt])
             +- Exchange(distribution=[hash[a, c, d, e, f, g]])
@@ -243,8 +243,8 @@ LogicalSink(table=[default_catalog.default_database.partitioned_sink], fields=[a
       <![CDATA[
 Sink(table=[default_catalog.default_database.partitioned_sink], fields=[a, c, d, e, f, g])
 +- Sort(orderBy=[c ASC, d ASC])
-   +- Calc(select=[f0 AS a, f1 AS c, f2 AS d, f3 AS e, f4 AS f, f5 AS g])
-      +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1($f0,a,c,d,e,f,g))], select=[$f0,a,c,d,e,f,g,f0,f1,f2,f3,f4,f5], rowType=[RecordType(BIGINT $f0, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, DOUBLE f3, BIGINT f4, INTEGER f5)], joinType=[INNER])
+   +- Calc(select=[a0 AS a, c0 AS c, d0 AS d, e0 AS e, f0 AS f, g0 AS g])
+      +- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3, $4, $5, $6)], correlate=[table($REPLICATE_ROWS$1($f0,a,c,d,e,f,g))], select=[$f0,a,c,d,e,f,g,a0,c0,d0,e0,f0,g0], rowType=[RecordType(BIGINT $f0, INTEGER a, VARCHAR(2147483647) c, VARCHAR(2147483647) d, DOUBLE e, BIGINT f, INTEGER g, INTEGER a0, VARCHAR(2147483647) c0, VARCHAR(2147483647) d0, DOUBLE e0, BIGINT f0, INTEGER g0)], joinType=[INNER])
          +- Calc(select=[IF(>(vcol_left_cnt, vcol_right_cnt), vcol_right_cnt, vcol_left_cnt) AS $f0, a, c, d, e, f, g], where=[AND(>=(vcol_left_cnt, 1), >=(vcol_right_cnt, 1))])
             +- HashAggregate(isMerge=[true], groupBy=[a, c, d, e, f, g], select=[a, c, d, e, f, g, Final_COUNT(count$0) AS vcol_left_cnt, Final_COUNT(count$1) AS vcol_right_cnt])
                +- Exchange(distribution=[hash[a, c, d, e, f, g]])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml
index c5a3d2f..ad832c4 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteIntersectAllRuleTest.xml
@@ -43,8 +43,7 @@ LogicalProject(c=[$2])
    :           +- LogicalProject(f=[$0], vcol_left_marker=[null:BOOLEAN], vcol_right_marker=[true])
    :              +- LogicalProject(f=[$2])
    :                 +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
-   +- LogicalProject(c=[$0])
-      +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+   +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)])
 ]]>
     </Resource>
   </TestCase>
@@ -77,8 +76,7 @@ LogicalProject(c=[$2])
    :           +- LogicalProject(f=[$0], vcol_left_marker=[null:BOOLEAN], vcol_right_marker=[true])
    :              +- LogicalProject(f=[$2])
    :                 +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
-   +- LogicalProject(c=[$0])
-      +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+   +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)])
 ]]>
     </Resource>
   </TestCase>
@@ -113,8 +111,7 @@ LogicalProject(c=[$2])
          :           +- LogicalProject(d=[$0], e=[$1], f=[$2], vcol_left_marker=[null:BOOLEAN], vcol_right_marker=[true])
          :              +- LogicalProject(d=[$0], e=[$1], f=[$2])
          :                 +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
-         +- LogicalProject(a=[$0], b=[$1], c=[$2])
-            +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], rowType=[RecordType:peek_no_expand(INTEGER f0, BIGINT f1, VARCHAR(2147483647) f2)])
+         +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)])
 ]]>
     </Resource>
   </TestCase>
@@ -147,8 +144,7 @@ LogicalProject(c=[$2])
    :              +- LogicalProject(f=[$2])
    :                 +- LogicalFilter(condition=[=(1, 0)])
    :                    +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
-   +- LogicalProject(c=[$0])
-      +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+   +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml
index ae6d9dc..e895649 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/RewriteMinusAllRuleTest.xml
@@ -43,8 +43,7 @@ LogicalProject(c=[$2])
    :           +- LogicalProject(f=[$0], vcol_marker=[-1:BIGINT])
    :              +- LogicalProject(f=[$2])
    :                 +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
-   +- LogicalProject(c=[$0])
-      +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+   +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)])
 ]]>
     </Resource>
   </TestCase>
@@ -77,8 +76,7 @@ LogicalProject(c=[$2])
    :           +- LogicalProject(f=[$0], vcol_marker=[-1:BIGINT])
    :              +- LogicalProject(f=[$2])
    :                 +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
-   +- LogicalProject(c=[$0])
-      +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+   +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)])
 ]]>
     </Resource>
   </TestCase>
@@ -111,8 +109,7 @@ LogicalProject(c=[$2])
    :              +- LogicalProject(f=[$2])
    :                 +- LogicalFilter(condition=[=(1, 0)])
    :                    +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
-   +- LogicalProject(c=[$0])
-      +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0)])
+   +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1)], rowType=[RecordType(VARCHAR(2147483647) c)])
 ]]>
     </Resource>
   </TestCase>
@@ -147,8 +144,7 @@ LogicalProject(c=[$2])
          :           +- LogicalProject(d=[$0], e=[$1], f=[$2], vcol_marker=[-1:BIGINT])
          :              +- LogicalProject(d=[$0], e=[$1], f=[$2])
          :                 +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]])
-         +- LogicalProject(a=[$0], b=[$1], c=[$2])
-            +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], rowType=[RecordType:peek_no_expand(INTEGER f0, BIGINT f1, VARCHAR(2147483647) f2)])
+         +- LogicalTableFunctionScan(invocation=[$REPLICATE_ROWS$1($0, $1, $2, $3)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c)])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml
index 59d848b..61552b1 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SetOperatorsTest.xml
@@ -58,8 +58,8 @@ LogicalIntersect(all=[true])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[f0 AS c])
-+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1($f0,c))], select=[$f0,c,f0], rowType=[RecordType(BIGINT $f0, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[c0 AS c])
++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1($f0,c))], select=[$f0,c,c0], rowType=[RecordType(BIGINT $f0, VARCHAR(2147483647) c, VARCHAR(2147483647) c0)], joinType=[INNER])
    +- Calc(select=[IF((vcol_left_cnt > vcol_right_cnt), vcol_right_cnt, vcol_left_cnt) AS $f0, c], where=[((vcol_left_cnt >= 1) AND (vcol_right_cnt >= 1))])
       +- GroupAggregate(groupBy=[c], select=[c, COUNT(vcol_left_marker) AS vcol_left_cnt, COUNT(vcol_right_marker) AS vcol_right_cnt])
          +- Exchange(distribution=[hash[c]])
@@ -182,8 +182,8 @@ LogicalMinus(all=[true])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[f0 AS c])
-+- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,c))], select=[sum_vcol_marker,c,f0], rowType=[RecordType(BIGINT sum_vcol_marker, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)], joinType=[INNER])
+Calc(select=[c0 AS c])
++- Correlate(invocation=[$REPLICATE_ROWS$1($0, $1)], correlate=[table($REPLICATE_ROWS$1(sum_vcol_marker,c))], select=[sum_vcol_marker,c,c0], rowType=[RecordType(BIGINT sum_vcol_marker, VARCHAR(2147483647) c, VARCHAR(2147483647) c0)], joinType=[INNER])
    +- Calc(select=[sum_vcol_marker, c], where=[(sum_vcol_marker > 0)])
       +- GroupAggregate(groupBy=[c], select=[c, SUM(vcol_marker) AS sum_vcol_marker])
          +- Exchange(distribution=[hash[c]])
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml
index 1c8dab3..063fcca 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/ColumnFunctionsTest.xml
@@ -153,12 +153,12 @@ Join(joinType=[InnerJoin], where=[(int1 = int2)], select=[int1, long1, string1,
       <![CDATA[
 LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(int, long, string)]]])
-+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(string))], select=[int,long,string,name,age], rowType=[RecordType(DOUBLE int, BIGINT long, VARCHAR(2147483647) string, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER])
+Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(string))], select=[int,long,string,name,age], rowType=[RecordType(DOUBLE int, BIGINT long, VARCHAR(2147483647) string, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(int, long, string)]]], fields=[int, long, string])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
index cbee6a2..2542e8d 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
@@ -21,12 +21,12 @@ limitations under the License.
       <![CDATA[
 LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], rowType=[RecordType(INTEGER x, INTEGER y)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], rowType=[RecordType(INTEGER x, INTEGER y)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-PythonCorrelate(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], correlate=[table(MockPythonTableFunction(a,b))], select=[a,b,c,x,y], rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, INTEGER y)], joinType=[INNER])
+PythonCorrelate(invocation=[*org.apache.flink.table.planner.utils.MockPythonTableFunction*($0, $1)], correlate=[table(*org.apache.flink.table.planner.utils.MockPythonTableFunction*(a,b))], select=[a,b,c,x,y], rowType=[RecordType(INTEGER a, INTEGER b, VARCHAR(2147483647) c, INTEGER x, INTEGER y)], joinType=[INNER])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -63,13 +63,13 @@ LogicalProject(c=[$0], d=[$1])
          +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
             :- LogicalProject(a=[$0], b=[$1], c=[$2])
             :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-            +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
+            +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, d])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -84,13 +84,13 @@ LogicalProject(c=[$0], d=[$1])
          +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
             :- LogicalProject(a=[$0], b=[$1], c=[$2])
             :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-            +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
+            +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, d])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc0*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc0*(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -101,13 +101,13 @@ Calc(select=[c, d])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -118,13 +118,13 @@ Calc(select=[c, s])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(TableFunc1(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2, _UTF-16LE'$')], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c,'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -135,13 +135,13 @@ Calc(select=[c, s])
 LogicalProject(c=[$2], name=[$3], len=[$4])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*($2))], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*($2))], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, name, len])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*($2))], correlate=[table(TableFunc2(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*(c)))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*($2))], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(*org.apache.flink.table.planner.expressions.utils.Func13$aceadf1af6c698a4705a8fbd3984d0a3*(c)))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -153,13 +153,13 @@ LogicalFilter(condition=[>($2, 2)])
 +- LogicalProject(c=[$2], name=[$3], len=[$4])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;])
+      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, name, len])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(TableFunc2(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -170,14 +170,13 @@ Calc(select=[c, name, len])
 LogicalProject(f0=[AS($3, _UTF-16LE'f0')], f1=[AS($4, _UTF-16LE'f1')])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(f1, f2, f3)]]])
-   +- LogicalProject(f0=[$0], f1_0=[$1])
-      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) f0, INTEGER f1)])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], rowType=[RecordType(VARCHAR(2147483647) f0, INTEGER f1_0)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Calc(select=[f0, f10 AS f1])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(f3))], select=[f1,f2,f3,f0,f10], rowType=[RecordType(INTEGER f1, BIGINT f2, VARCHAR(2147483647) f3, VARCHAR(2147483647) f0, INTEGER f10)], joinType=[INNER])
+Calc(select=[f0, f1_0 AS f1])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc2*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc2*(f3))], select=[f1,f2,f3,f0,f1_0], rowType=[RecordType(INTEGER f1, BIGINT f2, VARCHAR(2147483647) f3, VARCHAR(2147483647) f0, INTEGER f1_0)], joinType=[INNER])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(f1, f2, f3)]]], fields=[f1, f2, f3])
 ]]>
     </Resource>
@@ -209,12 +208,12 @@ Correlate(invocation=[str_split(_UTF-16LE'Jack,John')], correlate=[table(str_spl
       <![CDATA[
 LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Correlate(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], correlate=[table(HierarchyTableFunction(c))], select=[a,b,c,name,adult,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], joinType=[INNER])
+Correlate(invocation=[*org.apache.flink.table.planner.utils.HierarchyTableFunction*($2)], correlate=[table(*org.apache.flink.table.planner.utils.HierarchyTableFunction*(c))], select=[a,b,c,name,adult,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], joinType=[INNER])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -291,13 +290,13 @@ Correlate(invocation=[str_split(_UTF-16LE'Jack,John', _UTF-16LE',')], correlate=
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
++- Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*($2)], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
    +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -307,12 +306,12 @@ Calc(select=[c, s])
       <![CDATA[
 LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], correlate=[table(TableFunc1(SUBSTRING(c, 2)))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
+Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING($2, 2))], correlate=[table(*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRING(c, 2)))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -322,12 +321,12 @@ Correlate(invocation=[*org.apache.flink.table.planner.utils.TableFunc1*(SUBSTRIN
       <![CDATA[
 LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-+- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], rowType=[RecordType(INTEGER age, VARCHAR(2147483647) name)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Correlate(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], correlate=[table(PojoTableFunc(c))], select=[a,b,c,age,name], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER age, VARCHAR(2147483647) name)], joinType=[INNER])
+Correlate(invocation=[*org.apache.flink.table.planner.utils.PojoTableFunc*($2)], correlate=[table(*org.apache.flink.table.planner.utils.PojoTableFunc*(c))], select=[a,b,c,name,age], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER])
 +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableFunctionJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableFunctionJoinTest.xml
index 0525902..47f118e 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableFunctionJoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TemporalTableFunctionJoinTest.xml
@@ -25,7 +25,7 @@ LogicalJoin(condition=[=($3, $1)], joinType=[inner])
 :     +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :        :- LogicalProject(o_rowtime=[AS($0, _UTF-16LE'o_rowtime')], o_comment=[AS($1, _UTF-16LE'o_comment')], o_amount=[AS($2, _UTF-16LE'o_amount')], o_currency=[AS($3, _UTF-16LE'o_currency')], o_secondary_key=[AS($4, _UTF-16LE'o_secondary_key')])
 :        :  +- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
-:        +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$bbf912e58a3fb2d083552961c0f87dbe*($0)], rowType=[RecordType(TIMESTAMP(3) *ROWTIME* rowtime, VARCHAR(2147483647) comment, VARCHAR(2147483647) currency, INTEGER rate, INTEGER secondary_key)], elementType=[class [Ljava.lang.Object;])
+:        +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$bbf912e58a3fb2d083552961c0f87dbe*($0)], rowType=[RecordType(TIMESTAMP(3) *ROWTIME* rowtime, VARCHAR(2147483647) comment, VARCHAR(2147483647) currency, INTEGER rate, INTEGER secondary_key)])
 +- LogicalTableScan(table=[[default_catalog, default_database, ThirdTable]])
 ]]>
     </Resource>
@@ -53,7 +53,7 @@ LogicalProject(rate=[AS(*($0, $4), _UTF-16LE'rate')])
 +- LogicalFilter(condition=[=($3, $1)])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
       :- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
-      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$225833b9896cb7740aed9150b8cc7fd9*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)], elementType=[class [Ljava.lang.Object;])
+      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$225833b9896cb7740aed9150b8cc7fd9*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -74,7 +74,7 @@ LogicalProject(rate=[AS(*($0, $4), _UTF-16LE'rate')])
 +- LogicalFilter(condition=[=($3, $1)])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
       :- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
-      +- LogicalTableFunctionScan(invocation=[Rates($2)], rowType=[RecordType:peek_no_expand(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)])
+      +- LogicalTableFunctionScan(invocation=[Rates($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -95,7 +95,7 @@ LogicalProject(rate=[AS(*($0, $4), _UTF-16LE'rate')])
 +- LogicalFilter(condition=[=($3, $1)])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
       :- LogicalTableScan(table=[[default_catalog, default_database, ProctimeOrders]])
-      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$170cc46c47df69784f267e43f61e8e9d*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP_LTZ(3) *PROCTIME* proctime)], elementType=[class [Ljava.lang.Object;])
+      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$170cc46c47df69784f267e43f61e8e9d*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP_LTZ(3) *PROCTIME* proctime)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
@@ -117,7 +117,7 @@ LogicalProject(rate=[AS(*($0, $4), _UTF-16LE'rate')])
 +- LogicalFilter(condition=[=($3, $1)])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
       :- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
-      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$225833b9896cb7740aed9150b8cc7fd9*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)], elementType=[class [Ljava.lang.Object;])
+      +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.TemporalTableFunctionImpl$225833b9896cb7740aed9150b8cc7fd9*($2)], rowType=[RecordType(VARCHAR(2147483647) currency, INTEGER rate, TIMESTAMP(3) *ROWTIME* rowtime)])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala
index 14348ae..8ebaee6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/CorrelateTest.scala
@@ -34,8 +34,6 @@ class CorrelateTest extends TableTestBase {
     val util = batchTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val func = new TableFunc1
-    util.addFunction("func1", func)
-
     val result1 = table.joinLateral(func('c) as 's).select('c, 's)
 
     util.verifyExecPlan(result1)
@@ -46,8 +44,6 @@ class CorrelateTest extends TableTestBase {
     val util = batchTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val func = new TableFunc1
-    util.addFunction("func1", func)
-
     val result2 = table.joinLateral(func('c, "$") as 's).select('c, 's)
     util.verifyExecPlan(result2)
   }
@@ -57,8 +53,6 @@ class CorrelateTest extends TableTestBase {
     val util = batchTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val func = new TableFunc1
-    util.addFunction("func1", func)
-
     val result = table.leftOuterJoinLateral(func('c) as 's).select('c, 's).where('s > "")
     util.verifyExecPlan(result)
   }
@@ -68,8 +62,6 @@ class CorrelateTest extends TableTestBase {
     val util = batchTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val func = new TableFunc1
-    util.addFunction("func1", func)
-
     val result = table.leftOuterJoinLateral(func('c) as 's, true).select('c, 's)
     util.verifyExecPlan(result)
   }
@@ -79,8 +71,6 @@ class CorrelateTest extends TableTestBase {
     val util = batchTestUtil()
     val sourceTable = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val func = new TableFunc0
-    util.addFunction("func1", func)
-
     val result = sourceTable.select('a, 'b, 'c)
       .joinLateral(func('c) as('d, 'e))
       .select('c, 'd, 'e)
@@ -106,8 +96,6 @@ class CorrelateTest extends TableTestBase {
 
     val sourceTable = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val func = new TableFunc0
-    util.addFunction("func1", func)
-
     val result = sourceTable.select('a, 'b, 'c)
       .joinLateral(func('c) as('d, 'e))
       .select('c, 'd, 'e)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CorrelateValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CorrelateValidationTest.scala
index d69383e..611928e 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CorrelateValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/table/validation/CorrelateValidationTest.scala
@@ -36,7 +36,6 @@ class CorrelateValidationTest extends TableTestBase {
     val util = batchTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val func = new TableFunc1
-   util.addFunction("func1", func)
     val result = table
       .leftOuterJoinLateral(func('c) as 's, 'c === 's)
       .select('c, 's)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
index 339e7ca..ff1be68 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
@@ -36,8 +36,6 @@ class CorrelateTest extends TableTestBase {
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
     val function = new TableFunc1
-    util.addFunction("func1", function)
-
     val result1 = table.joinLateral(function('c) as 's).select('c, 's)
     util.verifyExecPlan(result1)
   }
@@ -48,7 +46,6 @@ class CorrelateTest extends TableTestBase {
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
 
     val function = new TableFunc1
-    util.addFunction("func1", function)
     // test overloading
     val result2 = table.joinLateral(function('c, "$") as 's).select('c, 's)
     util.verifyExecPlan(result2)
@@ -59,8 +56,6 @@ class CorrelateTest extends TableTestBase {
     val util = streamTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = new TableFunc1
-    util.addFunction("func1", function)
-
     val result = table.leftOuterJoinLateral(function('c) as 's, true).select('c, 's)
     util.verifyExecPlan(result)
   }
@@ -70,7 +65,6 @@ class CorrelateTest extends TableTestBase {
     val util = streamTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = new TableFunc2
-    util.addFunction("func2", function)
     val scalarFunc = new Func13("pre")
 
     val result = table.joinLateral(
@@ -84,8 +78,6 @@ class CorrelateTest extends TableTestBase {
     val util = streamTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = new HierarchyTableFunction
-    util.addFunction("hierarchy", function)
-
     val result = table.joinLateral(function('c) as ('name, 'adult, 'len))
     util.verifyExecPlan(result)
   }
@@ -95,8 +87,6 @@ class CorrelateTest extends TableTestBase {
     val util = streamTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = new PojoTableFunc
-    util.addFunction("pojo", function)
-
     val result = table.joinLateral(function('c))
     util.verifyExecPlan(result)
   }
@@ -106,8 +96,6 @@ class CorrelateTest extends TableTestBase {
     val util = streamTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = new TableFunc2
-    util.addFunction("func2", function)
-
     val result = table
       .joinLateral(function('c) as ('name, 'len))
       .select('c, 'name, 'len)
@@ -120,8 +108,6 @@ class CorrelateTest extends TableTestBase {
     val util = streamTestUtil()
     val table = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = new TableFunc1
-    util.addFunction("func1", function)
-
     val result = table.joinLateral(function('c.substring(2)) as 's)
     util.verifyExecPlan(result)
   }
@@ -131,8 +117,6 @@ class CorrelateTest extends TableTestBase {
     val util = streamTestUtil()
     val sourceTable = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = new TableFunc0
-    util.addFunction("func1", function)
-
     val result = sourceTable.select('a, 'b, 'c)
       .joinLateral(function('c) as('d, 'e))
       .select('c, 'd, 'e)
@@ -158,7 +142,6 @@ class CorrelateTest extends TableTestBase {
 
     val sourceTable = util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
     val function = new TableFunc0
-    util.addFunction("func1", function)
     val result = sourceTable.select('a, 'b, 'c)
       .joinLateral(function('c) as('d, 'e))
       .select('c, 'd, 'e)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/CorrelateValidationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/CorrelateValidationTest.scala
index ec2401f..a991cc5 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/CorrelateValidationTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/CorrelateValidationTest.scala
@@ -68,11 +68,10 @@ class CorrelateValidationTest extends TableTestBase {
     util.addFunction("func0", Func0)
 
     // SQL API call
-    // NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug
     expectExceptionThrown(
       util.tableEnv.sqlQuery("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
       null,
-      classOf[AssertionError])
+      classOf[ValidationException])
 
     //========== throw exception when the parameters is not correct ===============
     // Java Table API call
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala
index 17a35c2..a125f93 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CorrelateITCase.scala
@@ -222,7 +222,8 @@ class CorrelateITCase extends BatchTestBase {
       'a.cast(DataTypes.TINYINT) as 'a,
       'a.cast(DataTypes.SMALLINT) as 'b,
       'b.cast(DataTypes.FLOAT) as 'c)
-        .joinLateral(tFunc('a, 'b, 'c) as ('a2, 'b2, 'c2))
+        .joinLateral(
+          tFunc('a.ifNull(0.toByte), 'b.ifNull(0.toShort), 'c.ifNull(0.toFloat)) as ('a2, 'b2, 'c2))
 
     val results = executeQuery(result)
     val expected = Seq(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
index 9f106c4..683543b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
 
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
 import org.junit.rules.{ExpectedException, TemporaryFolder}
 import org.junit.{After, Before, Rule}
 
@@ -44,6 +45,7 @@ class StreamingTestBase extends AbstractTestBase {
   def tempFolder: TemporaryFolder = _tempFolder
 
   @Before
+  @BeforeEach
   def before(): Unit = {
     this.env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setParallelism(4)
@@ -55,6 +57,7 @@ class StreamingTestBase extends AbstractTestBase {
   }
 
   @After
+  @AfterEach
   def after(): Unit = {
     StreamTestSink.clear()
     TestValuesTableFactory.clearAllData()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
index f087d5a..6fcad3d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/UserDefinedTableFunctions.scala
@@ -29,8 +29,6 @@ import org.apache.flink.types.Row
 
 import org.junit.Assert
 
-import java.lang.Boolean
-
 import scala.annotation.varargs
 
 
@@ -117,9 +115,10 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[
 }
 
 @SerialVersionUID(1L)
+@DataTypeHint("ROW<x INT, y INT>")
 class MockPythonTableFunction extends TableFunction[Row] with PythonFunction {
 
-  def eval(x: Int, y: Int) = ???
+  def eval(x: java.lang.Integer, y: java.lang.Integer) = ???
 
   override def getResultType: TypeInformation[Row] =
     new RowTypeInfo(Types.INT, Types.INT)
@@ -368,28 +367,19 @@ class MockPythonTableFunction extends TableFunction[Row] with PythonFunction {
 //}
 
 @SerialVersionUID(1L)
+@DataTypeHint("ROW<f0 STRING, f1 STRING, f2 STRING>")
 class TableFunc4 extends TableFunction[Row] {
   def eval(b: Byte, s: Short, f: Float): Unit = {
     collect(Row.of("Byte=" + b, "Short=" + s, "Float=" + f))
   }
-
-  override def getResultType: TypeInformation[Row] = {
-    new RowTypeInfo(Types.STRING, Types.STRING, Types.STRING)
-  }
 }
 
 @SerialVersionUID(1L)
+@DataTypeHint("ROW<a INT, b INT, c INT>")
 class TableFunc6 extends TableFunction[Row] {
-  def eval(row: Row): Unit = {
+  def eval(@DataTypeHint("ROW<a INT, b INT, c INT>") row: Row): Unit = {
     collect(row)
   }
-
-  override def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] =
-    Array(new RowTypeInfo(Types.INT, Types.INT, Types.INT))
-
-  override def getResultType: TypeInformation[Row] = {
-    new RowTypeInfo(Types.INT, Types.INT, Types.INT)
-  }
 }
 
 @SerialVersionUID(1L)
@@ -421,12 +411,12 @@ class VarArgsFunc0 extends TableFunction[String] {
 }
 
 @SerialVersionUID(1L)
-class HierarchyTableFunction extends SplittableTableFunction[Boolean, Integer] {
+class HierarchyTableFunction extends SplittableTableFunction[java.lang.Boolean, Integer] {
   def eval(user: String) {
     if (user.contains("#")) {
       val splits = user.split("#")
       val age = splits(1).toInt
-      collect(new Tuple3[String, Boolean, Integer](splits(0), age >= 20, age))
+      collect(new Tuple3[String, java.lang.Boolean, Integer](splits(0), age >= 20, age))
     }
   }
 }

[flink] 01/03: [hotfix][table-planner] Deprecate SqlFunctions of old function stack

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3a2b8da7d9a114e09268f365d5e8796dfcbf8d17
Author: Timo Walther <tw...@apache.org>
AuthorDate: Fri Mar 18 10:30:29 2022 +0100

    [hotfix][table-planner] Deprecate SqlFunctions of old function stack
---
 .../flink/table/planner/functions/utils/ScalarSqlFunction.scala     | 5 +----
 .../table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala  | 6 +++++-
 .../flink/table/planner/plan/schema/TypedFlinkTableFunction.scala   | 6 +++++-
 3 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
index 380484b..cd0b87e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
@@ -23,8 +23,7 @@ import org.apache.flink.table.functions.{BuiltInFunctionDefinitions, FunctionIde
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
 import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction._
-import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.{getOperandType, _}
-import org.apache.flink.table.runtime.types.ClassLogicalTypeConverter.getDefaultExternalClassForType
+import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
 import org.apache.flink.table.types.logical.LogicalType
@@ -35,8 +34,6 @@ import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
 import org.apache.calcite.sql.`type`._
 import org.apache.calcite.sql.parser.SqlParserPos
 
-import scala.collection.JavaConverters._
-
 /**
   * Calcite wrapper for user-defined scalar functions.
   *
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala
index 79b278c..e7c6a1d 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala
@@ -19,7 +19,8 @@
 package org.apache.flink.table.planner.plan.schema
 
 import org.apache.flink.table.functions
-import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.table.functions.{BuiltInFunctionDefinitions, TableFunction}
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
 import org.apache.flink.table.types.DataType
@@ -33,7 +34,10 @@ import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
   *
   * @param tableFunction The Table Function instance
   * @param implicitResultType Implicit result type.
+  * @deprecated Use [[BuiltInFunctionDefinitions]] that translates to [[BridgingSqlFunction]].
   */
+@Deprecated
+@deprecated
 class DeferredTypeFlinkTableFunction(
     val tableFunction: TableFunction[_],
     val implicitResultType: DataType)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala
index 5b8e5af..eba0ff4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala
@@ -19,8 +19,9 @@
 package org.apache.flink.table.planner.plan.schema
 
 import org.apache.flink.table.functions
-import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.table.functions.{BuiltInFunctionDefinitions, TableFunction}
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
 import org.apache.flink.table.types.DataType
@@ -34,7 +35,10 @@ import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
   *
   * @param tableFunction The Table Function instance
   * @param externalResultType The result type which has been determined
+  * @deprecated Use [[BuiltInFunctionDefinitions]] that translates to [[BridgingSqlFunction]].
   */
+@Deprecated
+@deprecated
 class TypedFlinkTableFunction(
     val tableFunction: TableFunction[_],
     fieldNames: Array[String],

[flink] 02/03: [FLINK-26518][table-planner] Port FlinkRelBuilder to Java

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f8cb19e70ca7da6423dfb01b97e05c4d520c9fde
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Mar 9 12:14:46 2022 +0100

    [FLINK-26518][table-planner] Port FlinkRelBuilder to Java
---
 .../table/planner/calcite/FlinkRelBuilder.java     | 235 +++++++++++++++++++++
 .../catalog/QueryOperationCatalogViewTable.java    |   8 +-
 .../table/planner/delegation/PlannerContext.java   |   2 +-
 .../plan/rules/logical/SubQueryDecorrelator.java   |   3 +-
 .../BatchPhysicalPythonWindowAggregateRule.java    |   3 +-
 ...reamPhysicalPythonGroupWindowAggregateRule.java |   2 +-
 .../table/planner/calcite/FlinkRelBuilder.scala    | 233 --------------------
 .../planner/expressions/fieldExpression.scala      |   7 -
 .../planner/expressions/windowProperties.scala     |   5 -
 .../nodes/calcite/LogicalWindowAggregate.scala     |   6 +-
 .../calcite/LogicalWindowTableAggregate.scala      |   4 +-
 .../plan/nodes/calcite/WindowAggregate.scala       |  11 +-
 .../plan/nodes/calcite/WindowTableAggregate.scala  |  10 +-
 .../logical/FlinkLogicalWindowAggregate.scala      |   2 +-
 .../logical/FlinkLogicalWindowTableAggregate.scala |   2 +-
 .../rules/logical/CorrelateSortToRankRule.scala    |   2 +-
 16 files changed, 267 insertions(+), 268 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
new file mode 100644
index 0000000..35ab473
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRelBuilder.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.calcite;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.planner.calcite.FlinkRelFactories.ExpandFactory;
+import org.apache.flink.table.planner.calcite.FlinkRelFactories.RankFactory;
+import org.apache.flink.table.planner.hint.FlinkHints;
+import org.apache.flink.table.planner.plan.QueryOperationConverter;
+import org.apache.flink.table.planner.plan.logical.LogicalWindow;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalTableAggregate;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowTableAggregate;
+import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
+import org.apache.flink.table.runtime.operators.rank.RankRange;
+import org.apache.flink.table.runtime.operators.rank.RankType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptSchema;
+import org.apache.calcite.plan.RelOptTable.ToRelContext;
+import org.apache.calcite.plan.ViewExpanders;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.UnaryOperator;
+
+import static org.apache.flink.table.planner.plan.utils.AggregateUtil.isTableAggregate;
+
+/** Flink-specific {@link RelBuilder}. */
+@Internal
+public final class FlinkRelBuilder extends RelBuilder {
+
+    private final QueryOperationConverter toRelNodeConverter;
+
+    private final ExpandFactory expandFactory;
+
+    private final RankFactory rankFactory;
+
+    private FlinkRelBuilder(Context context, RelOptCluster cluster, RelOptSchema relOptSchema) {
+        super(context, cluster, relOptSchema);
+
+        this.toRelNodeConverter =
+                new QueryOperationConverter(this, context.unwrap(FlinkContext.class).isBatchMode());
+        this.expandFactory =
+                Util.first(
+                        context.unwrap(ExpandFactory.class),
+                        FlinkRelFactories.DEFAULT_EXPAND_FACTORY());
+        this.rankFactory =
+                Util.first(
+                        context.unwrap(RankFactory.class),
+                        FlinkRelFactories.DEFAULT_RANK_FACTORY());
+    }
+
+    public static FlinkRelBuilder of(
+            Context context, RelOptCluster cluster, RelOptSchema relOptSchema) {
+        return new FlinkRelBuilder(Preconditions.checkNotNull(context), cluster, relOptSchema);
+    }
+
+    public static FlinkRelBuilder of(RelOptCluster cluster, RelOptSchema relOptSchema) {
+        return FlinkRelBuilder.of(cluster.getPlanner().getContext(), cluster, relOptSchema);
+    }
+
+    public static RelBuilderFactory proto(Context context) {
+        return (cluster, schema) -> {
+            final Context clusterContext = cluster.getPlanner().getContext();
+            final Context chain = Contexts.chain(context, clusterContext);
+            return FlinkRelBuilder.of(chain, cluster, schema);
+        };
+    }
+
+    public RelBuilder expand(List<List<RexNode>> projects, int expandIdIndex) {
+        final RelNode input = build();
+        final RelNode expand = expandFactory.createExpand(input, projects, expandIdIndex);
+        return push(expand);
+    }
+
+    public RelBuilder rank(
+            ImmutableBitSet partitionKey,
+            RelCollation orderKey,
+            RankType rankType,
+            RankRange rankRange,
+            RelDataTypeField rankNumberType,
+            boolean outputRankNumber) {
+        final RelNode input = build();
+        final RelNode rank =
+                rankFactory.createRank(
+                        input,
+                        partitionKey,
+                        orderKey,
+                        rankType,
+                        rankRange,
+                        rankNumberType,
+                        outputRankNumber);
+        return push(rank);
+    }
+
+    /** Build non-window aggregate for either aggregate or table aggregate. */
+    @Override
+    public RelBuilder aggregate(
+            RelBuilder.GroupKey groupKey, Iterable<RelBuilder.AggCall> aggCalls) {
+        // build a relNode, the build() may also return a project
+        RelNode relNode = super.aggregate(groupKey, aggCalls).build();
+
+        if (relNode instanceof LogicalAggregate) {
+            final LogicalAggregate logicalAggregate = (LogicalAggregate) relNode;
+            if (isTableAggregate(logicalAggregate.getAggCallList())) {
+                relNode = LogicalTableAggregate.create(logicalAggregate);
+            } else if (isCountStarAgg(logicalAggregate)) {
+                final RelNode newAggInput =
+                        push(logicalAggregate.getInput(0)).project(literal(0)).build();
+                relNode =
+                        logicalAggregate.copy(
+                                logicalAggregate.getTraitSet(), ImmutableList.of(newAggInput));
+            }
+        }
+
+        return push(relNode);
+    }
+
+    /** Build window aggregate for either aggregate or table aggregate. */
+    public RelBuilder windowAggregate(
+            LogicalWindow window,
+            GroupKey groupKey,
+            List<NamedWindowProperty> namedProperties,
+            Iterable<AggCall> aggCalls) {
+        // build logical aggregate
+
+        // Because of:
+        // [CALCITE-3763] RelBuilder.aggregate should prune unused fields from the input,
+        // if the input is a Project.
+        //
+        // the field can not be pruned if it is referenced by other expressions
+        // of the window aggregation(i.e. the TUMBLE_START/END).
+        // To solve this, we config the RelBuilder to forbidden this feature.
+        final LogicalAggregate aggregate =
+                (LogicalAggregate)
+                        super.transform(t -> t.withPruneInputOfAggregate(false))
+                                .push(build())
+                                .aggregate(groupKey, aggCalls)
+                                .build();
+
+        // build logical window aggregate from it
+        final RelNode windowAggregate;
+        if (isTableAggregate(aggregate.getAggCallList())) {
+            windowAggregate =
+                    LogicalWindowTableAggregate.create(window, namedProperties, aggregate);
+        } else {
+            windowAggregate = LogicalWindowAggregate.create(window, namedProperties, aggregate);
+        }
+        return push(windowAggregate);
+    }
+
+    /** Build watermark assigner relational node. */
+    public RelBuilder watermark(int rowtimeFieldIndex, RexNode watermarkExpr) {
+        final RelNode input = build();
+        final RelNode relNode =
+                LogicalWatermarkAssigner.create(cluster, input, rowtimeFieldIndex, watermarkExpr);
+        return push(relNode);
+    }
+
+    public RelBuilder queryOperation(QueryOperation queryOperation) {
+        final RelNode relNode = queryOperation.accept(toRelNodeConverter);
+        return push(relNode);
+    }
+
+    public RelBuilder scan(ObjectIdentifier identifier, Map<String, String> dynamicOptions) {
+        final List<RelHint> hints = new ArrayList<>();
+        hints.add(
+                RelHint.builder(FlinkHints.HINT_NAME_OPTIONS).hintOptions(dynamicOptions).build());
+        final ToRelContext toRelContext = ViewExpanders.simpleContext(cluster, hints);
+        final RelNode relNode =
+                relOptSchema.getTableForMember(identifier.toList()).toRel(toRelContext);
+        return push(relNode);
+    }
+
+    @Override
+    public FlinkTypeFactory getTypeFactory() {
+        return (FlinkTypeFactory) super.getTypeFactory();
+    }
+
+    @Override
+    public RelBuilder transform(UnaryOperator<Config> transform) {
+        // Override in order to return a FlinkRelBuilder.
+        final Context mergedContext =
+                Contexts.of(transform.apply(Config.DEFAULT), cluster.getPlanner().getContext());
+        return FlinkRelBuilder.of(mergedContext, cluster, relOptSchema);
+    }
+
+    private static boolean isCountStarAgg(LogicalAggregate agg) {
+        if (agg.getGroupCount() != 0 || agg.getAggCallList().size() != 1) {
+            return false;
+        }
+        final AggregateCall call = agg.getAggCallList().get(0);
+        return call.getAggregation().getKind() == SqlKind.COUNT
+                && call.filterArg == -1
+                && call.getArgList().isEmpty();
+    }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
index 9ba8e6e..32353c1 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/QueryOperationCatalogViewTable.java
@@ -25,6 +25,9 @@ import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import org.apache.flink.table.planner.plan.schema.ExpandingPreparingTable;
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
 
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptSchema;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rel.RelNode;
@@ -70,8 +73,9 @@ public class QueryOperationCatalogViewTable extends ExpandingPreparingTable {
 
     @Override
     public RelNode convertToRel(RelOptTable.ToRelContext context) {
-        FlinkRelBuilder relBuilder =
-                FlinkRelBuilder.of(context, context.getCluster(), this.getRelOptSchema());
+        final RelOptCluster cluster = context.getCluster();
+        final Context chain = Contexts.of(context, cluster.getPlanner().getContext());
+        final FlinkRelBuilder relBuilder = FlinkRelBuilder.of(chain, cluster, getRelOptSchema());
 
         return relBuilder.queryOperation(catalogView.getQueryOperation()).build();
     }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
index 19a4b6e..e98e582 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/PlannerContext.java
@@ -184,7 +184,7 @@ public class PlannerContext {
                         context,
                         // Sets up the ViewExpander explicitly for FlinkRelBuilder.
                         createFlinkPlanner(currentCatalog, currentDatabase).createToRelContext());
-        return new FlinkRelBuilder(chain, cluster, relOptSchema);
+        return FlinkRelBuilder.of(chain, cluster, relOptSchema);
     }
 
     /**
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java
index b80b884..828286b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java
@@ -134,8 +134,7 @@ public class SubQueryDecorrelator extends RelShuttleImpl {
         }
 
         RelOptCluster cluster = rootRel.getCluster();
-        RelBuilder relBuilder =
-                new FlinkRelBuilder(cluster.getPlanner().getContext(), cluster, null);
+        RelBuilder relBuilder = FlinkRelBuilder.of(cluster, null);
         RexBuilder rexBuilder = cluster.getRexBuilder();
 
         final SubQueryDecorrelator decorrelator =
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java
index 9eadb76..847b803 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalPythonWindowAggregateRule.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
 import org.apache.flink.table.planner.plan.utils.AggregateUtil;
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
 import org.apache.flink.table.planner.plan.utils.PythonUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.types.DataType;
 
 import org.apache.calcite.plan.RelOptRule;
@@ -160,7 +161,7 @@ public class BatchPhysicalPythonWindowAggregateRule extends RelOptRule {
                         window,
                         inputTimeFieldIndex,
                         inputTimeIsDate,
-                        agg.getNamedProperties());
+                        JavaScalaConversionUtil.toScala(agg.getNamedProperties()));
         call.transformTo(windowAgg);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
index 002f1ae..5f080d7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalPythonGroupWindowAggregateRule.java
@@ -143,7 +143,7 @@ public class StreamPhysicalPythonGroupWindowAggregateRule extends ConverterRule
                 agg.getGroupSet().toArray(),
                 JavaScalaConversionUtil.toScala(aggCalls),
                 agg.getWindow(),
-                agg.getNamedProperties(),
+                JavaScalaConversionUtil.toScala(agg.getNamedProperties()),
                 emitStrategy);
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala
deleted file mode 100644
index b96e510..0000000
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkRelBuilder.scala
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.calcite
-
-import org.apache.flink.table.operations.QueryOperation
-import org.apache.flink.table.planner.calcite.FlinkRelFactories.{ExpandFactory, RankFactory}
-import org.apache.flink.table.planner.expressions.WindowProperty
-import org.apache.flink.table.planner.plan.QueryOperationConverter
-import org.apache.flink.table.planner.plan.logical.LogicalWindow
-import org.apache.flink.table.planner.plan.nodes.calcite.{LogicalTableAggregate, LogicalWatermarkAssigner, LogicalWindowAggregate, LogicalWindowTableAggregate}
-import org.apache.flink.table.planner.plan.utils.AggregateUtil
-import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty
-import org.apache.flink.table.runtime.operators.rank.{RankRange, RankType}
-
-import com.google.common.collect.ImmutableList
-import org.apache.calcite.plan._
-import org.apache.calcite.rel.RelCollation
-import org.apache.calcite.rel.`type`.RelDataTypeField
-import org.apache.calcite.rel.hint.RelHint
-import org.apache.calcite.rel.logical.LogicalAggregate
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.SqlKind
-import org.apache.calcite.tools.RelBuilder.{AggCall, Config, GroupKey}
-import org.apache.calcite.tools.{RelBuilder, RelBuilderFactory}
-import org.apache.calcite.util.{ImmutableBitSet, Util}
-import org.apache.flink.table.catalog.ObjectIdentifier
-import org.apache.flink.table.planner.hint.FlinkHints
-
-import java.lang.Iterable
-import java.util
-import java.util.List
-import java.util.function.UnaryOperator
-
-import scala.collection.JavaConversions._
-
-/**
-  * Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]].
-  */
-class FlinkRelBuilder(
-    context: Context,
-    relOptCluster: RelOptCluster,
-    relOptSchema: RelOptSchema)
-  extends RelBuilder(
-    context,
-    relOptCluster,
-    relOptSchema) {
-
-  require(context != null)
-
-  private val toRelNodeConverter = {
-    new QueryOperationConverter(this, context.unwrap(classOf[FlinkContext]).isBatchMode)
-  }
-
-  private val expandFactory: ExpandFactory = {
-    Util.first(context.unwrap(classOf[ExpandFactory]), FlinkRelFactories.DEFAULT_EXPAND_FACTORY)
-  }
-
-  private val rankFactory: RankFactory = {
-    Util.first(context.unwrap(classOf[RankFactory]), FlinkRelFactories.DEFAULT_RANK_FACTORY)
-  }
-
-  override def getRelOptSchema: RelOptSchema = relOptSchema
-
-  override def getCluster: RelOptCluster = relOptCluster
-
-  override def getTypeFactory: FlinkTypeFactory =
-    super.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-
-  override def transform(transform: UnaryOperator[RelBuilder.Config]): FlinkRelBuilder = {
-    // Override in order to return a FlinkRelBuilder.
-    FlinkRelBuilder.of(transform.apply(Config.DEFAULT), cluster, relOptSchema)
-  }
-
-  def expand(
-      projects: util.List[util.List[RexNode]],
-      expandIdIndex: Int): RelBuilder = {
-    val input = build()
-    val expand = expandFactory.createExpand(input, projects, expandIdIndex)
-    push(expand)
-  }
-
-  def rank(
-      partitionKey: ImmutableBitSet,
-      orderKey: RelCollation,
-      rankType: RankType,
-      rankRange: RankRange,
-      rankNumberType: RelDataTypeField,
-      outputRankNumber: Boolean): RelBuilder = {
-    val input = build()
-    val rank = rankFactory.createRank(input, partitionKey, orderKey, rankType, rankRange,
-      rankNumberType, outputRankNumber)
-    push(rank)
-  }
-
-  /**
-    * Build non-window aggregate for either aggregate or table aggregate.
-    */
-  override def aggregate(groupKey: GroupKey, aggCalls: Iterable[AggCall]): RelBuilder = {
-    // build a relNode, the build() may also return a project
-    val relNode = super.aggregate(groupKey, aggCalls).build()
-
-    def isCountStartAgg(agg: LogicalAggregate): Boolean = {
-      if (agg.getGroupCount != 0 || agg.getAggCallList.size() != 1) {
-        return false
-      }
-      val call = agg.getAggCallList.head
-      call.getAggregation.getKind == SqlKind.COUNT &&
-          call.filterArg == -1 && call.getArgList.isEmpty
-    }
-
-    relNode match {
-      case logicalAggregate: LogicalAggregate
-        if AggregateUtil.isTableAggregate(logicalAggregate.getAggCallList) =>
-        push(LogicalTableAggregate.create(logicalAggregate))
-      case logicalAggregate2: LogicalAggregate
-        if isCountStartAgg(logicalAggregate2) =>
-        val newAggInput = push(logicalAggregate2.getInput(0))
-            .project(literal(0)).build()
-        push(logicalAggregate2.copy(logicalAggregate2.getTraitSet, ImmutableList.of(newAggInput)))
-      case _ => push(relNode)
-    }
-  }
-
-  /**
-    * Build window aggregate for either aggregate or table aggregate.
-    */
-  def windowAggregate(
-      window: LogicalWindow,
-      groupKey: GroupKey,
-      namedProperties: List[NamedWindowProperty],
-      aggCalls: Iterable[AggCall]): RelBuilder = {
-    // build logical aggregate
-
-    // Because of:
-    // [CALCITE-3763] RelBuilder.aggregate should prune unused fields from the input,
-    // if the input is a Project.
-    //
-    // the field can not be pruned if it is referenced by other expressions
-    // of the window aggregation(i.e. the TUMBLE_START/END).
-    // To solve this, we config the RelBuilder to forbidden this feature.
-    val aggregate = super.transform(
-      new UnaryOperator[RelBuilder.Config] {
-        override def apply(t: RelBuilder.Config)
-          : RelBuilder.Config = t.withPruneInputOfAggregate(false)
-      })
-      .push(build())
-      .aggregate(groupKey, aggCalls)
-      .build()
-      .asInstanceOf[LogicalAggregate]
-
-    // build logical window aggregate from it
-    aggregate match {
-      case logicalAggregate: LogicalAggregate
-        if AggregateUtil.isTableAggregate(logicalAggregate.getAggCallList) =>
-        push(LogicalWindowTableAggregate.create(window, namedProperties, aggregate))
-      case _ => push(LogicalWindowAggregate.create(window, namedProperties, aggregate))
-    }
-  }
-
-  /**
-    * Build watermark assigner relation node.
-    */
-  def watermark(rowtimeFieldIndex: Int, watermarkExpr: RexNode): RelBuilder = {
-    val input = build()
-    val watermarkAssigner = LogicalWatermarkAssigner
-      .create(cluster, input, rowtimeFieldIndex, watermarkExpr)
-    push(watermarkAssigner)
-    this
-  }
-
-  def queryOperation(queryOperation: QueryOperation): RelBuilder = {
-    val relNode = queryOperation.accept(toRelNodeConverter)
-    push(relNode)
-    this
-  }
-
-  def scan(
-      identifier: ObjectIdentifier,
-      dynamicOptions: util.Map[String, String]): RelBuilder = {
-    val hints = new util.ArrayList[RelHint]
-    hints.add(RelHint.builder(FlinkHints.HINT_NAME_OPTIONS).hintOptions(dynamicOptions).build)
-    val toRelContext = ViewExpanders.simpleContext(cluster, hints)
-    push(relOptSchema.getTableForMember(identifier.toList).toRel(toRelContext))
-    this
-  }
-}
-
-object FlinkRelBuilder {
-
-  case class NamedWindowProperty(name: String, property: WindowProperty)
-
-  def proto(context: Context): RelBuilderFactory = new RelBuilderFactory() {
-    def create(cluster: RelOptCluster, schema: RelOptSchema): RelBuilder = {
-      val clusterContext = cluster.getPlanner.getContext.unwrap(classOf[FlinkContext])
-      val mergedContext = Contexts.chain(context, clusterContext)
-
-      new FlinkRelBuilder(mergedContext, cluster, schema)
-    }
-  }
-
-  def of(cluster: RelOptCluster, relOptSchema: RelOptSchema): FlinkRelBuilder = {
-    val clusterContext = cluster.getPlanner.getContext
-    new FlinkRelBuilder(
-      clusterContext,
-      cluster,
-      relOptSchema)
-  }
-
-  def of(contextVar: Object, cluster: RelOptCluster, relOptSchema: RelOptSchema)
-    : FlinkRelBuilder = {
-    val mergedContext = Contexts.of(contextVar, cluster.getPlanner.getContext)
-    new FlinkRelBuilder(
-      mergedContext,
-      cluster,
-      relOptSchema)
-  }
-}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
index fd86f67..013e687 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/fieldExpression.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.expressions
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api._
 import org.apache.flink.table.operations.QueryOperation
-import org.apache.flink.table.planner.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory._
 import org.apache.flink.table.planner.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
@@ -150,9 +149,6 @@ case class RowtimeAttribute(expr: PlannerExpression) extends TimeAttribute(expr)
     }
   }
 
-  override def toNamedWindowProperty(name: String): NamedWindowProperty =
-    NamedWindowProperty(name, this)
-
   override def toString: String = s"rowtime($child)"
 }
 
@@ -174,9 +170,6 @@ case class ProctimeAttribute(expr: PlannerExpression) extends TimeAttribute(expr
   override def resultType: TypeInformation[_] =
     TimeIndicatorTypeInfo.PROCTIME_INDICATOR
 
-  override def toNamedWindowProperty(name: String): NamedWindowProperty =
-    NamedWindowProperty(name, this)
-
   override def toString: String = s"proctime($child)"
 }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/windowProperties.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/windowProperties.scala
index 0e68163..ce6940a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/windowProperties.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/expressions/windowProperties.scala
@@ -19,13 +19,10 @@
 package org.apache.flink.table.planner.expressions
 
 import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.table.planner.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.planner.validate.{ValidationFailure, ValidationSuccess}
 
 trait WindowProperty {
 
-  def toNamedWindowProperty(name: String): NamedWindowProperty
-
   def resultType: TypeInformation[_]
 
 }
@@ -42,8 +39,6 @@ abstract class AbstractWindowProperty(child: PlannerExpression)
     } else {
       ValidationFailure("Child must be a window reference.")
     }
-
-  def toNamedWindowProperty(name: String): NamedWindowProperty = NamedWindowProperty(name, this)
 }
 
 case class WindowStart(child: PlannerExpression) extends AbstractWindowProperty(child) {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowAggregate.scala
index 0d70fd7..f3e35e1 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowAggregate.scala
@@ -36,7 +36,7 @@ final class LogicalWindowAggregate(
     groupSet: ImmutableBitSet,
     aggCalls: util.List[AggregateCall],
     window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty])
+    namedProperties: util.List[NamedWindowProperty])
   extends WindowAggregate(cluster, traitSet, child, groupSet, aggCalls, window, namedProperties) {
 
   override def copy(
@@ -55,7 +55,7 @@ final class LogicalWindowAggregate(
       namedProperties)
   }
 
-  def copy(namedProperties: Seq[NamedWindowProperty]): LogicalWindowAggregate = {
+  def copy(namedProperties: util.List[NamedWindowProperty]): LogicalWindowAggregate = {
     new LogicalWindowAggregate(
       cluster,
       traitSet,
@@ -71,7 +71,7 @@ object LogicalWindowAggregate {
 
   def create(
       window: LogicalWindow,
-      namedProperties: Seq[NamedWindowProperty],
+      namedProperties: util.List[NamedWindowProperty],
       agg: Aggregate): LogicalWindowAggregate = {
     require(agg.getGroupType == Group.SIMPLE)
     val cluster: RelOptCluster = agg.getCluster
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowTableAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowTableAggregate.scala
index 6ae042f..f9a2ed2 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowTableAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalWindowTableAggregate.scala
@@ -41,7 +41,7 @@ class LogicalWindowTableAggregate(
     groupSets: util.List[ImmutableBitSet],
     aggCalls: util.List[AggregateCall],
     window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty])
+    namedProperties: util.List[NamedWindowProperty])
   extends WindowTableAggregate(
     cluster,
     traitSet,
@@ -69,7 +69,7 @@ object LogicalWindowTableAggregate {
 
   def create(
     window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty],
+    namedProperties: util.List[NamedWindowProperty],
     aggregate: Aggregate): LogicalWindowTableAggregate = {
 
     val cluster: RelOptCluster = aggregate.getCluster
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowAggregate.scala
index 884be0a..c28dd1a 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowAggregate.scala
@@ -31,6 +31,9 @@ import org.apache.calcite.util.ImmutableBitSet
 
 import java.util
 
+import scala.collection.JavaConverters._
+
+
 /**
   * Relational operator that eliminates duplicates and computes totals with time window group.
   *
@@ -43,7 +46,7 @@ abstract class WindowAggregate(
     groupSet: ImmutableBitSet,
     aggCalls: util.List[AggregateCall],
     window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty])
+    namedProperties: util.List[NamedWindowProperty])
   extends Aggregate(
     cluster,
     traitSet,
@@ -54,7 +57,7 @@ abstract class WindowAggregate(
 
   def getWindow: LogicalWindow = window
 
-  def getNamedProperties: Seq[NamedWindowProperty] = namedProperties
+  def getNamedProperties: util.List[NamedWindowProperty] = namedProperties
 
   override def accept(shuttle: RelShuttle): RelNode = shuttle.visit(this)
 
@@ -63,7 +66,7 @@ abstract class WindowAggregate(
     val typeFactory = getCluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
     val builder = typeFactory.builder
     builder.addAll(aggregateRowType.getFieldList)
-    namedProperties.foreach { namedProp =>
+    namedProperties.asScala.foreach { namedProp =>
       builder.add(
         namedProp.getName,
         typeFactory.createFieldTypeFromLogicalType(namedProp.getProperty.getResultType)
@@ -82,6 +85,6 @@ abstract class WindowAggregate(
   override def explainTerms(pw: RelWriter): RelWriter = {
     super.explainTerms(pw)
       .item("window", window)
-      .item("properties", namedProperties.map(_.getName).mkString(", "))
+      .item("properties", namedProperties.asScala.map(_.getName).mkString(", "))
   }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowTableAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowTableAggregate.scala
index 98bcaea..d388b5b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowTableAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/WindowTableAggregate.scala
@@ -30,6 +30,8 @@ import org.apache.calcite.util.ImmutableBitSet
 
 import java.util
 
+import scala.collection.JavaConverters._
+
 /**
   * Relational operator that represents a window table aggregate. A TableAggregate is similar to the
   * [[org.apache.calcite.rel.core.Aggregate]] but may output 0 or more records for a group.
@@ -42,19 +44,19 @@ abstract class WindowTableAggregate(
     groupSets: util.List[ImmutableBitSet],
     aggCalls: util.List[AggregateCall],
     window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty])
+    namedProperties: util.List[NamedWindowProperty])
   extends TableAggregate(cluster, traitSet, input, groupSet, groupSets, aggCalls) {
 
   def getWindow: LogicalWindow = window
 
-  def getNamedProperties: Seq[NamedWindowProperty] = namedProperties
+  def getNamedProperties: util.List[NamedWindowProperty] = namedProperties
 
   override def deriveRowType(): RelDataType = {
     val aggregateRowType = super.deriveRowType()
     val typeFactory = getCluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
     val builder = typeFactory.builder
     builder.addAll(aggregateRowType.getFieldList)
-    namedProperties.foreach { namedProp =>
+    namedProperties.asScala.foreach { namedProp =>
       builder.add(
         namedProp.getName,
         typeFactory.createFieldTypeFromLogicalType(namedProp.getProperty.getResultType)
@@ -66,6 +68,6 @@ abstract class WindowTableAggregate(
   override def explainTerms(pw: RelWriter): RelWriter = {
     super.explainTerms(pw)
       .item("window", window)
-      .item("properties", namedProperties.map(_.getName).mkString(", "))
+      .item("properties", namedProperties.asScala.map(_.getName).mkString(", "))
   }
 }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
index 8cef117..7884f59 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
@@ -43,7 +43,7 @@ class FlinkLogicalWindowAggregate(
     groupSet: ImmutableBitSet,
     aggCalls: util.List[AggregateCall],
     window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty])
+    namedProperties: util.List[NamedWindowProperty])
   extends WindowAggregate(cluster, traitSet, child, groupSet, aggCalls, window, namedProperties)
   with FlinkLogicalRel {
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
index add5d41..7dcc81b 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalWindowTableAggregate.scala
@@ -44,7 +44,7 @@ class FlinkLogicalWindowTableAggregate(
     groupSets: util.List[ImmutableBitSet],
     aggCalls: util.List[AggregateCall],
     window: LogicalWindow,
-    namedProperties: Seq[NamedWindowProperty])
+    namedProperties: util.List[NamedWindowProperty])
   extends WindowTableAggregate(
     cluster,
     traitSet,
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala
index b2d0797..e8013a8 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/CorrelateSortToRankRule.scala
@@ -175,7 +175,7 @@ class CorrelateSortToRankRule extends RelOptRule(
           1,
           sort.fetch.asInstanceOf[RexLiteral].getValueAs(classOf[java.lang.Long])),
         null,
-        outputRankNumber = false)
+        false)
       .project(projects)
       .build()