You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/07/23 11:47:03 UTC

[flink] branch master updated: [FLINK-13318][table-planner-blink] Fix Blink planner tests failing on Scala 2.12 by setting SerialVersionUID to Scala UDFs

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 898b190  [FLINK-13318][table-planner-blink] Fix Blink planner tests failing on Scala 2.12 by setting SerialVersionUID to Scala UDFs
898b190 is described below

commit 898b190ab0c69617a6a0208dc553e0399761730d
Author: beyond1920 <be...@126.com>
AuthorDate: Mon Jul 22 14:29:26 2019 +0800

    [FLINK-13318][table-planner-blink] Fix Blink planner tests failing on Scala 2.12 by setting SerialVersionUID to Scala UDFs
    
    This is because:
    
    1. we print the udf's md5 into the node explanation
    2. blink planner uses XML to verify plan, the XML content is immutable between different Scala versions
    3. the serialization behavior is different between Scala 2.11 and 2.12 when version id is not defined.
    
    That's why we add version id to all existing scala udfs. But this doesn't mean we force users to set this. This is just a workaround for the plan test for different Scala versions. Users can still use a Scala udf without version id in blink planner and flink planner.
    
    This closes #9193
---
 .../table/plan/util/ExplodeFunctionUtil.scala      |  9 +++++
 .../flink/table/plan/batch/table/CalcTest.xml      |  6 +--
 .../flink/table/plan/batch/table/CorrelateTest.xml | 24 ++++++------
 .../flink/table/plan/batch/table/JoinTest.xml      |  2 +-
 .../stringexpr/CorrelateStringExpressionTest.xml   | 32 ++++++++--------
 .../PushFilterIntoTableSourceScanRuleTest.xml      |  2 +-
 .../flink/table/plan/stream/table/CalcTest.xml     |  6 +--
 .../plan/stream/table/ColumnFunctionsTest.xml      |  8 ++--
 .../table/plan/stream/table/CorrelateTest.xml      | 44 +++++++++++-----------
 .../table/plan/stream/table/OverWindowTest.xml     |  2 +-
 .../utils/userDefinedScalarFunctions.scala         | 31 +++++++++++++++
 .../flink/table/plan/batch/table/CalcTest.scala    |  2 +
 .../plan/batch/table/ColumnFunctionsTest.scala     |  1 +
 .../flink/table/plan/batch/table/JoinTest.scala    |  1 +
 .../plan/stream/sql/ModifiedMonotonicityTest.scala |  1 +
 .../plan/stream/sql/join/LookupJoinTest.scala      | 10 +++++
 .../plan/stream/table/ColumnFunctionsTest.scala    |  1 +
 .../table/runtime/batch/sql/CorrelateITCase.scala  |  7 +++-
 .../table/runtime/batch/table/CalcITCase.scala     |  2 +
 .../runtime/stream/sql/MatchRecognizeITCase.scala  |  2 +
 .../table/runtime/stream/table/CalcITCase.scala    |  8 ++--
 .../utils/InMemoryLookupableTableSource.scala      |  1 +
 .../utils/UserDefinedFunctionTestUtils.scala       | 24 ++++++++++++
 .../table/util/UserDefinedTableFunctions.scala     | 15 ++++++++
 24 files changed, 173 insertions(+), 68 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
index d0f9475..f5e8187 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/ExplodeFunctionUtil.scala
@@ -29,6 +29,7 @@ import java.util
 
 import scala.collection.JavaConverters._
 
+@SerialVersionUID(1L)
 class ObjectExplodeTableFunc(componentType: TypeInformation[_]) extends TableFunction[Object] {
   def eval(arr: Array[Object]): Unit = {
     arr.foreach(collect)
@@ -47,6 +48,7 @@ class ObjectExplodeTableFunc(componentType: TypeInformation[_]) extends TableFun
   }
 }
 
+@SerialVersionUID(1L)
 class FloatExplodeTableFunc extends TableFunction[Float] {
   def eval(arr: Array[Float]): Unit = {
     arr.foreach(collect)
@@ -57,6 +59,7 @@ class FloatExplodeTableFunc extends TableFunction[Float] {
   }
 }
 
+@SerialVersionUID(1L)
 class ShortExplodeTableFunc extends TableFunction[Short] {
   def eval(arr: Array[Short]): Unit = {
     arr.foreach(collect)
@@ -67,6 +70,7 @@ class ShortExplodeTableFunc extends TableFunction[Short] {
   }
 }
 
+@SerialVersionUID(1L)
 class IntExplodeTableFunc extends TableFunction[Int] {
   def eval(arr: Array[Int]): Unit = {
     arr.foreach(collect)
@@ -77,6 +81,7 @@ class IntExplodeTableFunc extends TableFunction[Int] {
   }
 }
 
+@SerialVersionUID(1L)
 class LongExplodeTableFunc extends TableFunction[Long] {
   def eval(arr: Array[Long]): Unit = {
     arr.foreach(collect)
@@ -87,6 +92,7 @@ class LongExplodeTableFunc extends TableFunction[Long] {
   }
 }
 
+@SerialVersionUID(1L)
 class DoubleExplodeTableFunc extends TableFunction[Double] {
   def eval(arr: Array[Double]): Unit = {
     arr.foreach(collect)
@@ -97,6 +103,7 @@ class DoubleExplodeTableFunc extends TableFunction[Double] {
   }
 }
 
+@SerialVersionUID(1L)
 class ByteExplodeTableFunc extends TableFunction[Byte] {
   def eval(arr: Array[Byte]): Unit = {
     arr.foreach(collect)
@@ -107,6 +114,7 @@ class ByteExplodeTableFunc extends TableFunction[Byte] {
   }
 }
 
+@SerialVersionUID(1L)
 class BooleanExplodeTableFunc extends TableFunction[Boolean] {
   def eval(arr: Array[Boolean]): Unit = {
     arr.foreach(collect)
@@ -117,6 +125,7 @@ class BooleanExplodeTableFunc extends TableFunction[Boolean] {
   }
 }
 
+@SerialVersionUID(1L)
 class MapExplodeTableFunc extends TableFunction[Row] {
   def eval(map: util.Map[Object, Object]): Unit = {
     map.asScala.foreach { case (key, value) =>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/CalcTest.xml
index 9500570..705f41a 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/CalcTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/CalcTest.xml
@@ -19,7 +19,7 @@ limitations under the License.
   <TestCase name="testScalarFunctionAccess">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(_c0=[org$apache$flink$table$plan$batch$table$CalcTest$giveMeCaseClass$$fe1bff2b06d8e0e495536102224cfe83().my], _c1=[org$apache$flink$table$plan$batch$table$CalcTest$giveMeCaseClass$$fe1bff2b06d8e0e495536102224cfe83().clazz], _c2=[org$apache$flink$table$plan$batch$table$CalcTest$giveMeCaseClass$$fe1bff2b06d8e0e495536102224cfe83().my], _c3=[org$apache$flink$table$plan$batch$table$CalcTest$giveMeCaseClass$$fe1bff2b06d8e0e495536102224cfe83().clazz])
+LogicalProject(_c0=[org$apache$flink$table$plan$batch$table$CalcTest$giveMeCaseClass$$18fa62c9eac49881dabdf7e951bc1e9d().my], _c1=[org$apache$flink$table$plan$batch$table$CalcTest$giveMeCaseClass$$18fa62c9eac49881dabdf7e951bc1e9d().clazz], _c2=[org$apache$flink$table$plan$batch$table$CalcTest$giveMeCaseClass$$18fa62c9eac49881dabdf7e951bc1e9d().my], _c3=[org$apache$flink$table$plan$batch$table$CalcTest$giveMeCaseClass$$18fa62c9eac49881dabdf7e951bc1e9d().clazz])
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b)]]])
 ]]>
     </Resource>
@@ -169,7 +169,7 @@ Calc(select=[a])
       <![CDATA[
 LogicalProject(EXPR$0=[$1])
 +- LogicalAggregate(group=[{4}], EXPR$0=[SUM($0)])
-   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], k=[org$apache$flink$table$plan$batch$table$CalcTest$MyHashCode$$d14b486109d9dd062ae7c60e04977975($2)])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], k=[org$apache$flink$table$plan$batch$table$CalcTest$MyHashCode$$764ff94224d9683f52793b162a26f03a($2)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]])
 ]]>
     </Resource>
@@ -221,7 +221,7 @@ Calc(select=[a, b])
   <TestCase name="testSelectFunction">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(_c0=[org$apache$flink$table$plan$batch$table$CalcTest$MyHashCode$$d14b486109d9dd062ae7c60e04977975($2)], b=[$1])
+LogicalProject(_c0=[org$apache$flink$table$plan$batch$table$CalcTest$MyHashCode$$764ff94224d9683f52793b162a26f03a($2)], b=[$1])
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/CorrelateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/CorrelateTest.xml
index a6ca745..dfc6d6e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/CorrelateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/CorrelateTest.xml
@@ -26,13 +26,13 @@ LogicalProject(c=[$0], d=[$1])
          +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
             :- LogicalProject(a=[$0], b=[$1], c=[$2])
             :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-            +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc0$2d39fea38a8a8fb8536772fd858e67ed($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
+            +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc0$9430e25565030ded9d7befd51458694a($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, d])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc0$2d39fea38a8a8fb8536772fd858e67ed($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc0$9430e25565030ded9d7befd51458694a($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -47,13 +47,13 @@ LogicalProject(c=[$0], d=[$1])
          +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
             :- LogicalProject(a=[$0], b=[$1], c=[$2])
             :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-            +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc0$2d39fea38a8a8fb8536772fd858e67ed($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
+            +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc0$9430e25565030ded9d7befd51458694a($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, d])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc0$2d39fea38a8a8fb8536772fd858e67ed($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc0$9430e25565030ded9d7befd51458694a($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -64,13 +64,13 @@ Calc(select=[c, d])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -81,13 +81,13 @@ Calc(select=[c, s])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2, _UTF-16LE'$')], correlate=[table(TableFunc1(c,_UTF-16LE'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2, _UTF-16LE'$')], correlate=[table(TableFunc1(c,_UTF-16LE'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -99,13 +99,13 @@ LogicalFilter(condition=[>($1, _UTF-16LE'')])
 +- LogicalProject(c=[$2], s=[$3])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-      +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+      +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, s], where=[>(s, _UTF-16LE'')])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -116,13 +116,13 @@ Calc(select=[c, s], where=[>(s, _UTF-16LE'')])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/JoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/JoinTest.xml
index e2d258a..38f03a5 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/JoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/JoinTest.xml
@@ -20,7 +20,7 @@ limitations under the License.
     <Resource name="planBefore">
       <![CDATA[
 LogicalFilter(condition=[>=($0, 0)])
-+- LogicalProject(c1=[org$apache$flink$table$plan$batch$table$JoinTest$Merger$$223b7380fec29c4077a893c60165d845($2, org$apache$flink$table$plan$batch$table$JoinTest$Merger$$223b7380fec29c4077a893c60165d845($2, $5))])
++- LogicalProject(c1=[org$apache$flink$table$plan$batch$table$JoinTest$Merger$$dd1e0003baf9c0bf59f98f20745ade7a($2, org$apache$flink$table$plan$batch$table$JoinTest$Merger$$dd1e0003baf9c0bf59f98f20745ade7a($2, $5))])
    +- LogicalJoin(condition=[=($1, $4)], joinType=[left])
       :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
       +- LogicalTableScan(table=[[default_catalog, default_database, Table2, source: [TestTableSource(d, e, f)]]])
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml
index 88c0563..b89015f 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/table/stringexpr/CorrelateStringExpressionTest.xml
@@ -22,13 +22,13 @@ limitations under the License.
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -39,13 +39,13 @@ Calc(select=[c, s])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
    +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -56,13 +56,13 @@ Calc(select=[c, s])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2, _UTF-16LE'$')], correlate=[table(TableFunc1(c,_UTF-16LE'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2, _UTF-16LE'$')], correlate=[table(TableFunc1(c,_UTF-16LE'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -73,13 +73,13 @@ Calc(select=[c, s])
 LogicalProject(c=[$2], name=[$3], len=[$4])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc2$b3b1f988779be024ed9386bce5019112($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc2$38e026f47155ee076ca1043e149d6ef4($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, name, len])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc2$b3b1f988779be024ed9386bce5019112($2)], correlate=[table(TableFunc2(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc2$38e026f47155ee076ca1043e149d6ef4($2)], correlate=[table(TableFunc2(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -90,13 +90,13 @@ Calc(select=[c, name, len])
 LogicalProject(c=[$2], name=[$3], len=[$5], adult=[$4])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$HierarchyTableFunction$172d96aa11f5379846a3a8c5fa560e0e($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$HierarchyTableFunction$171aeb1fc7238e43614fb8191a7ce40e($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, name, len, adult])
-+- Correlate(invocation=[org$apache$flink$table$util$HierarchyTableFunction$172d96aa11f5379846a3a8c5fa560e0e($2)], correlate=[table(HierarchyTableFunction(c))], select=[a,b,c,name,adult,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], joinType=[INNER])
++- Correlate(invocation=[org$apache$flink$table$util$HierarchyTableFunction$171aeb1fc7238e43614fb8191a7ce40e($2)], correlate=[table(HierarchyTableFunction(c))], select=[a,b,c,name,adult,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -107,13 +107,13 @@ Calc(select=[c, name, len, adult])
 LogicalProject(c=[$2], name=[$4], age=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$PojoTableFunc$b05c920aa134b36f9cfc9d9b23368bcf($2)], rowType=[RecordType(INTEGER age, VARCHAR(2147483647) name)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$PojoTableFunc$3db5a1277e6c51780b70d7609eb1f4f5($2)], rowType=[RecordType(INTEGER age, VARCHAR(2147483647) name)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, name, age])
-+- Correlate(invocation=[org$apache$flink$table$util$PojoTableFunc$b05c920aa134b36f9cfc9d9b23368bcf($2)], correlate=[table(PojoTableFunc(c))], select=[a,b,c,age,name], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER age, VARCHAR(2147483647) name)], joinType=[INNER])
++- Correlate(invocation=[org$apache$flink$table$util$PojoTableFunc$3db5a1277e6c51780b70d7609eb1f4f5($2)], correlate=[table(PojoTableFunc(c))], select=[a,b,c,age,name], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER age, VARCHAR(2147483647) name)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -125,13 +125,13 @@ LogicalFilter(condition=[>($2, 2)])
 +- LogicalProject(c=[$2], name=[$3], len=[$4])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
       :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
-      +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc2$b3b1f988779be024ed9386bce5019112($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;])
+      +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc2$38e026f47155ee076ca1043e149d6ef4($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, name, len])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc2$b3b1f988779be024ed9386bce5019112($2)], correlate=[table(TableFunc2(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc2$38e026f47155ee076ca1043e149d6ef4($2)], correlate=[table(TableFunc2(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)])
    +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -142,13 +142,13 @@ Calc(select=[c, name, len])
 LogicalProject(a=[$0], c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[a, c, s])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696(SUBSTRING($2, 2))], correlate=[table(TableFunc1(SUBSTRING(c, 2)))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9(SUBSTRING($2, 2))], correlate=[table(TableFunc1(SUBSTRING(c, 2)))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml
index 2459643..df61619 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml
@@ -164,7 +164,7 @@ LogicalProject(name=[$0], id=[$1], amount=[$2], price=[$3])
     <Resource name="planAfter">
       <![CDATA[
 LogicalProject(name=[$0], id=[$1], amount=[$2], price=[$3])
-+- LogicalFilter(condition=[<(org$apache$flink$table$expressions$utils$Func1$$a39386268ffec8461452460bcbe089ad($2), 32)])
++- LogicalFilter(condition=[<(org$apache$flink$table$expressions$utils$Func1$$832880661df998837536377b638c1c45($2), 32)])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [filter=[greaterThan(amount, 2)]]]])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/table/CalcTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/table/CalcTest.xml
index 1191d18..967049b 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/table/CalcTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/table/CalcTest.xml
@@ -62,7 +62,7 @@ Calc(select=[a, b, CAST(_UTF-16LE'xx':VARCHAR(2147483647) CHARACTER SET "UTF-16L
   <TestCase name="testSimpleMap">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(_c0=[org$apache$flink$table$expressions$utils$Func23$$de6190eff5cfcd5dd1d5877a871e2387($0, $1, $2).f0], _c1=[org$apache$flink$table$expressions$utils$Func23$$de6190eff5cfcd5dd1d5877a871e2387($0, $1, $2).f1], _c2=[org$apache$flink$table$expressions$utils$Func23$$de6190eff5cfcd5dd1d5877a871e2387($0, $1, $2).f2], _c3=[org$apache$flink$table$expressions$utils$Func23$$de6190eff5cfcd5dd1d5877a871e2387($0, $1, $2).f3])
+LogicalProject(_c0=[org$apache$flink$table$expressions$utils$Func23$$a4f0bc5e2c8c15477f8cf8746a7bb883($0, $1, $2).f0], _c1=[org$apache$flink$table$expressions$utils$Func23$$a4f0bc5e2c8c15477f8cf8746a7bb883($0, $1, $2).f1], _c2=[org$apache$flink$table$expressions$utils$Func23$$a4f0bc5e2c8c15477f8cf8746a7bb883($0, $1, $2).f2], _c3=[org$apache$flink$table$expressions$utils$Func23$$a4f0bc5e2c8c15477f8cf8746a7bb883($0, $1, $2).f3])
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
@@ -93,7 +93,7 @@ Calc(select=[a, b], where=[AND(>(a, 0), <(b, 2), =(MOD(a, 2), 1))])
   <TestCase name="testMultiMap">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(_c0=[org$apache$flink$table$expressions$utils$Func24$$4d71da721f8fba30223be1cd2b5af2ce(org$apache$flink$table$expressions$utils$Func23$$de6190eff5cfcd5dd1d5877a871e2387($0, $1, $2).f0, org$apache$flink$table$expressions$utils$Func23$$de6190eff5cfcd5dd1d5877a871e2387($0, $1, $2).f1, org$apache$flink$table$expressions$utils$Func23$$de6190eff5cfcd5dd1d5877a871e2387($0, $1, $2).f2, org$apache$flink$table$expressions$utils$Func23$$de6190eff5cfcd5dd1d5877a871e2387($0, $1, $2).f3 [...]
+LogicalProject(_c0=[org$apache$flink$table$expressions$utils$Func24$$83ddcfa3f176335a37949e30e769b539(org$apache$flink$table$expressions$utils$Func23$$a4f0bc5e2c8c15477f8cf8746a7bb883($0, $1, $2).f0, org$apache$flink$table$expressions$utils$Func23$$a4f0bc5e2c8c15477f8cf8746a7bb883($0, $1, $2).f1, org$apache$flink$table$expressions$utils$Func23$$a4f0bc5e2c8c15477f8cf8746a7bb883($0, $1, $2).f2, org$apache$flink$table$expressions$utils$Func23$$a4f0bc5e2c8c15477f8cf8746a7bb883($0, $1, $2).f3 [...]
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
@@ -122,7 +122,7 @@ Calc(select=[a, b, c], where=[OR(NOT IN(b, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12
   <TestCase name="testScalarResult">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(_c0=[org$apache$flink$table$expressions$utils$Func1$$a39386268ffec8461452460bcbe089ad($0)])
+LogicalProject(_c0=[org$apache$flink$table$expressions$utils$Func1$$832880661df998837536377b638c1c45($0)])
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/table/ColumnFunctionsTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/table/ColumnFunctionsTest.xml
index 420f6ee..2882119 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/table/ColumnFunctionsTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/table/ColumnFunctionsTest.xml
@@ -19,7 +19,7 @@ limitations under the License.
   <TestCase name="testAddColumns">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(a=[$0], b=[$1], c=[$2], d=[org$apache$flink$table$plan$stream$table$TestFunc$$fd4dfa9e9ae53c7b8d0f13d2db94ac9b($0, $1)])
+LogicalProject(a=[$0], b=[$1], c=[$2], d=[org$apache$flink$table$plan$stream$table$TestFunc$$2c242c3bf97088f3940b309439746678($0, $1)])
 +- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(a, b, c)]]])
 ]]>
     </Resource>
@@ -153,12 +153,12 @@ Join(joinType=[InnerJoin], where=[=(int1, int2)], select=[int1, long1, string1,
       <![CDATA[
 LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(int, long, string)]]])
-+- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc0$2d39fea38a8a8fb8536772fd858e67ed($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc0$9430e25565030ded9d7befd51458694a($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Correlate(invocation=[org$apache$flink$table$util$TableFunc0$2d39fea38a8a8fb8536772fd858e67ed($2)], correlate=[table(TableFunc0(string))], select=[int,long,string,name,age], rowType=[RecordType(DOUBLE int, BIGINT long, VARCHAR(2147483647) string, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER])
+Correlate(invocation=[org$apache$flink$table$util$TableFunc0$9430e25565030ded9d7befd51458694a($2)], correlate=[table(TableFunc0(string))], select=[int,long,string,name,age], rowType=[RecordType(DOUBLE int, BIGINT long, VARCHAR(2147483647) string, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER])
 +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(int, long, string)]]], fields=[int, long, string])
 ]]>
     </Resource>
@@ -196,7 +196,7 @@ Calc(select=[a AS d, b])
   <TestCase name="testStar">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(_c0=[org$apache$flink$table$plan$stream$table$TestFunc$$fd4dfa9e9ae53c7b8d0f13d2db94ac9b($0, $1)])
+LogicalProject(_c0=[org$apache$flink$table$plan$stream$table$TestFunc$$2c242c3bf97088f3940b309439746678($0, $1)])
 +- LogicalTableScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(double, long)]]])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/table/CorrelateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/table/CorrelateTest.xml
index 8740858..d15d21a 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/table/CorrelateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/table/CorrelateTest.xml
@@ -26,13 +26,13 @@ LogicalProject(c=[$0], d=[$1])
          +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
             :- LogicalProject(a=[$0], b=[$1], c=[$2])
             :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-            +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc0$2d39fea38a8a8fb8536772fd858e67ed($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
+            +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc0$9430e25565030ded9d7befd51458694a($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, d])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc0$2d39fea38a8a8fb8536772fd858e67ed($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc0$9430e25565030ded9d7befd51458694a($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -47,13 +47,13 @@ LogicalProject(c=[$0], d=[$1])
          +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
             :- LogicalProject(a=[$0], b=[$1], c=[$2])
             :  +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-            +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc0$2d39fea38a8a8fb8536772fd858e67ed($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
+            +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc0$9430e25565030ded9d7befd51458694a($2)], rowType=[RecordType(VARCHAR(2147483647) d, INTEGER e)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, d])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc0$2d39fea38a8a8fb8536772fd858e67ed($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc0$9430e25565030ded9d7befd51458694a($2)], correlate=[table(TableFunc0(c))], select=[a,b,c,d,e], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) d, INTEGER e)], joinType=[INNER], condition=[>($1, 20)])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -64,13 +64,13 @@ Calc(select=[c, d])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -81,13 +81,13 @@ Calc(select=[c, s])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2, _UTF-16LE'$')], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2, _UTF-16LE'$')], correlate=[table(TableFunc1(c,_UTF-16LE'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2, _UTF-16LE'$')], correlate=[table(TableFunc1(c,_UTF-16LE'$'))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -98,13 +98,13 @@ Calc(select=[c, s])
 LogicalProject(c=[$2], name=[$3], len=[$4])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc2$b3b1f988779be024ed9386bce5019112(org$apache$flink$table$expressions$utils$Func13$054570f6203667830dd24328319ff13c($2))], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc2$38e026f47155ee076ca1043e149d6ef4(org$apache$flink$table$expressions$utils$Func13$96c3040db24d3c59d1f1dd1aae2751bf($2))], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, name, len])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc2$b3b1f988779be024ed9386bce5019112(org$apache$flink$table$expressions$utils$Func13$054570f6203667830dd24328319ff13c($2))], correlate=[table(TableFunc2(Func13(c)))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc2$38e026f47155ee076ca1043e149d6ef4(org$apache$flink$table$expressions$utils$Func13$96c3040db24d3c59d1f1dd1aae2751bf($2))], correlate=[table(TableFunc2(Func13(c)))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -116,13 +116,13 @@ LogicalFilter(condition=[>($2, 2)])
 +- LogicalProject(c=[$2], name=[$3], len=[$4])
    +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
       :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-      +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc2$b3b1f988779be024ed9386bce5019112($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;])
+      +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc2$38e026f47155ee076ca1043e149d6ef4($2)], rowType=[RecordType(VARCHAR(2147483647) name, INTEGER len)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, name, len])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc2$b3b1f988779be024ed9386bce5019112($2)], correlate=[table(TableFunc2(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc2$38e026f47155ee076ca1043e149d6ef4($2)], correlate=[table(TableFunc2(c))], select=[a,b,c,name,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, INTEGER len)], joinType=[INNER], condition=[>($1, 2)])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -132,12 +132,12 @@ Calc(select=[c, name, len])
       <![CDATA[
 LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-+- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$HierarchyTableFunction$172d96aa11f5379846a3a8c5fa560e0e($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$HierarchyTableFunction$171aeb1fc7238e43614fb8191a7ce40e($2)], rowType=[RecordType(VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Correlate(invocation=[org$apache$flink$table$util$HierarchyTableFunction$172d96aa11f5379846a3a8c5fa560e0e($2)], correlate=[table(HierarchyTableFunction(c))], select=[a,b,c,name,adult,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], joinType=[INNER])
+Correlate(invocation=[org$apache$flink$table$util$HierarchyTableFunction$171aeb1fc7238e43614fb8191a7ce40e($2)], correlate=[table(HierarchyTableFunction(c))], select=[a,b,c,name,adult,len], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)], joinType=[INNER])
 +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -148,13 +148,13 @@ Correlate(invocation=[org$apache$flink$table$util$HierarchyTableFunction$172d96a
 LogicalProject(f0=[AS($3, _UTF-16LE'f0')], f1=[AS($4, _UTF-16LE'f1')])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(f1, f2, f3)]]])
-   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc2$b3b1f988779be024ed9386bce5019112($2)], rowType=[RecordType(VARCHAR(2147483647) f0, INTEGER f1_0)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc2$38e026f47155ee076ca1043e149d6ef4($2)], rowType=[RecordType(VARCHAR(2147483647) f0, INTEGER f1_0)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[f0, f1_0 AS f1])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc2$b3b1f988779be024ed9386bce5019112($2)], correlate=[table(TableFunc2(f3))], select=[f1,f2,f3,f0,f1_0], rowType=[RecordType(INTEGER f1, BIGINT f2, VARCHAR(2147483647) f3, VARCHAR(2147483647) f0, INTEGER f1_0)], joinType=[INNER])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc2$38e026f47155ee076ca1043e149d6ef4($2)], correlate=[table(TableFunc2(f3))], select=[f1,f2,f3,f0,f1_0], rowType=[RecordType(INTEGER f1, BIGINT f2, VARCHAR(2147483647) f3, VARCHAR(2147483647) f0, INTEGER f1_0)], joinType=[INNER])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(f1, f2, f3)]]], fields=[f1, f2, f3])
 ]]>
     </Resource>
@@ -165,13 +165,13 @@ Calc(select=[f0, f1_0 AS f1])
 LogicalProject(c=[$2], s=[$3])
 +- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{}])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
+   +- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2)], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Calc(select=[c, s])
-+- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
++- Correlate(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9($2)], correlate=[table(TableFunc1(c))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[LEFT])
    +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -181,12 +181,12 @@ Calc(select=[c, s])
       <![CDATA[
 LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-+- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9(SUBSTRING($2, 2))], rowType=[RecordType(VARCHAR(2147483647) s)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Correlate(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b09fa4c94287696(SUBSTRING($2, 2))], correlate=[table(TableFunc1(SUBSTRING(c, 2)))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
+Correlate(invocation=[org$apache$flink$table$util$TableFunc1$0095c6d14cbc6fc6cdbe8d8d3a9251b9(SUBSTRING($2, 2))], correlate=[table(TableFunc1(SUBSTRING(c, 2)))], select=[a,b,c,s], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)], joinType=[INNER])
 +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
@@ -196,12 +196,12 @@ Correlate(invocation=[org$apache$flink$table$util$TableFunc1$ad38060966060e704b0
       <![CDATA[
 LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
 :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]])
-+- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$PojoTableFunc$b05c920aa134b36f9cfc9d9b23368bcf($2)], rowType=[RecordType(INTEGER age, VARCHAR(2147483647) name)], elementType=[class [Ljava.lang.Object;])
++- LogicalTableFunctionScan(invocation=[org$apache$flink$table$util$PojoTableFunc$3db5a1277e6c51780b70d7609eb1f4f5($2)], rowType=[RecordType(INTEGER age, VARCHAR(2147483647) name)], elementType=[class [Ljava.lang.Object;])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Correlate(invocation=[org$apache$flink$table$util$PojoTableFunc$b05c920aa134b36f9cfc9d9b23368bcf($2)], correlate=[table(PojoTableFunc(c))], select=[a,b,c,age,name], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER age, VARCHAR(2147483647) name)], joinType=[INNER])
+Correlate(invocation=[org$apache$flink$table$util$PojoTableFunc$3db5a1277e6c51780b70d7609eb1f4f5($2)], correlate=[table(PojoTableFunc(c))], select=[a,b,c,age,name], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, INTEGER age, VARCHAR(2147483647) name)], joinType=[INNER])
 +- TableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/table/OverWindowTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/table/OverWindowTest.xml
index 4f3cb39..7e81bcb 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/table/OverWindowTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/table/OverWindowTest.xml
@@ -291,7 +291,7 @@ Calc(select=[a, c, w0$o0 AS _c2, w0$o1 AS wAvg])
   <TestCase name="testScalarFunctionsOnOverWindow">
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(d=[AS(org$apache$flink$table$expressions$utils$Func1$$a39386268ffec8461452460bcbe089ad(AS(SUM($0) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'wsum')), _UTF-16LE'd')], _c1=[AS(EXP(CAST(COUNT($0) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)):DOUBLE), _UTF-16LE'_c1')], _c2=[AS(+(org$apache$flink$table$plan$util$JavaUserDefinedAggFunctions$WeightedAvgWithRetract$c7 [...]
+LogicalProject(d=[AS(org$apache$flink$table$expressions$utils$Func1$$832880661df998837536377b638c1c45(AS(SUM($0) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), _UTF-16LE'wsum')), _UTF-16LE'd')], _c1=[AS(EXP(CAST(COUNT($0) OVER (PARTITION BY $1 ORDER BY $3 NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)):DOUBLE), _UTF-16LE'_c1')], _c2=[AS(+(org$apache$flink$table$plan$util$JavaUserDefinedAggFunctions$WeightedAvgWithRetract$c7 [...]
 +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
index 3ea44b3..29ee5a5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
@@ -38,12 +38,14 @@ import scala.io.Source
 
 case class SimplePojo(name: String, age: Int)
 
+@SerialVersionUID(1L)
 object Func0 extends ScalarFunction {
   def eval(index: Int): Int = {
     index
   }
 }
 
+@SerialVersionUID(1L)
 object Func1 extends ScalarFunction {
   def eval(index: Integer): Integer = {
     index + 1
@@ -56,42 +58,49 @@ object Func1 extends ScalarFunction {
   def eval(f: Float): Float = f + 1
 }
 
+@SerialVersionUID(1L)
 object Func2 extends ScalarFunction {
   def eval(index: Integer, str: String, pojo: SimplePojo): String = {
     s"$index and $str and $pojo"
   }
 }
 
+@SerialVersionUID(1L)
 object Func3 extends ScalarFunction {
   def eval(index: Integer, str: String): String = {
     s"$index and $str"
   }
 }
 
+@SerialVersionUID(1L)
 object Func4 extends ScalarFunction {
   def eval(): Integer = {
     null
   }
 }
 
+@SerialVersionUID(1L)
 object Func5 extends ScalarFunction {
   def eval(): Int = {
     -1
   }
 }
 
+@SerialVersionUID(1L)
 object Func6 extends ScalarFunction {
   def eval(date: Date, time: Time, timestamp: Timestamp): (Date, Time, Timestamp) = {
     (date, time, timestamp)
   }
 }
 
+@SerialVersionUID(1L)
 object Func7 extends ScalarFunction {
   def eval(a: Integer, b: Integer): Integer = {
     a + b
   }
 }
 
+@SerialVersionUID(1L)
 object Func8 extends ScalarFunction {
   def eval(a: Int): String = {
     "a"
@@ -106,12 +115,14 @@ object Func8 extends ScalarFunction {
   }
 }
 
+@SerialVersionUID(1L)
 object Func9 extends ScalarFunction {
   def eval(a: Int, b: Int, c: Long): String = {
     s"$a and $b and $c"
   }
 }
 
+@SerialVersionUID(1L)
 object Func10 extends ScalarFunction {
   def eval(c: Long): Long = {
     c
@@ -121,12 +132,14 @@ object Func10 extends ScalarFunction {
     SqlTimeTypeInfo.TIMESTAMP
 }
 
+@SerialVersionUID(1L)
 object Func11 extends ScalarFunction {
   def eval(a: Int, b: Long): String = {
     s"$a and $b"
   }
 }
 
+@SerialVersionUID(1L)
 object Func12 extends ScalarFunction {
   def eval(a: Long): Long = {
     a
@@ -137,12 +150,14 @@ object Func12 extends ScalarFunction {
   }
 }
 
+@SerialVersionUID(1L)
 object ShouldNotExecuteFunc extends ScalarFunction {
   def eval(s: String): Boolean = {
     throw new Exception("This func should never be executed")
   }
 }
 
+@SerialVersionUID(1L)
 class RichFunc0 extends ScalarFunction {
   var openCalled = false
   var closeCalled = false
@@ -183,6 +198,7 @@ class RichFunc0 extends ScalarFunction {
   }
 }
 
+@SerialVersionUID(1L)
 class RichFunc1 extends ScalarFunction {
   var added: Int = Int.MaxValue
 
@@ -199,6 +215,7 @@ class RichFunc1 extends ScalarFunction {
   }
 }
 
+@SerialVersionUID(1L)
 class RichFunc2 extends ScalarFunction {
   var prefix = "ERROR_VALUE"
 
@@ -215,6 +232,7 @@ class RichFunc2 extends ScalarFunction {
   }
 }
 
+@SerialVersionUID(1L)
 class RichFunc3 extends ScalarFunction {
   private val words = mutable.HashSet[String]()
 
@@ -234,12 +252,14 @@ class RichFunc3 extends ScalarFunction {
   }
 }
 
+@SerialVersionUID(1L)
 class Func13(prefix: String) extends ScalarFunction {
   def eval(a: String): String = {
     s"$prefix-$a"
   }
 }
 
+@SerialVersionUID(1L)
 object Func14 extends ScalarFunction {
 
   @varargs
@@ -248,6 +268,7 @@ object Func14 extends ScalarFunction {
   }
 }
 
+@SerialVersionUID(1L)
 object Func15 extends ScalarFunction {
 
   @varargs
@@ -260,6 +281,7 @@ object Func15 extends ScalarFunction {
   }
 }
 
+@SerialVersionUID(1L)
 object Func16 extends ScalarFunction {
 
   def eval(a: Seq[String]): String = {
@@ -267,6 +289,7 @@ object Func16 extends ScalarFunction {
   }
 }
 
+@SerialVersionUID(1L)
 object Func17 extends ScalarFunction {
 
   // Without @varargs, we will throw an exception
@@ -275,12 +298,14 @@ object Func17 extends ScalarFunction {
   }
 }
 
+@SerialVersionUID(1L)
 object Func18 extends ScalarFunction {
   def eval(str: String, prefix: String): Boolean = {
     str.startsWith(prefix)
   }
 }
 
+@SerialVersionUID(1L)
 object Func19 extends ScalarFunction {
   def eval(obj: Object): Int = {
     if (null != obj) {
@@ -299,6 +324,7 @@ object Func19 extends ScalarFunction {
   }
 }
 
+@SerialVersionUID(1L)
 object Func20 extends ScalarFunction {
   def eval(row: Row): Row = {
     row
@@ -315,6 +341,7 @@ object Func20 extends ScalarFunction {
   }
 }
 
+@SerialVersionUID(1L)
 object Func23 extends ScalarFunction {
   def eval(a: Integer, b: JLong, c: String): Row = {
     Row.of("star", a, b, c)
@@ -324,6 +351,7 @@ object Func23 extends ScalarFunction {
     Types.ROW(Types.STRING, Types.INT, Types.LONG, Types.STRING)
 }
 
+@SerialVersionUID(1L)
 object Func25 extends ScalarFunction {
   private val random = new Random()
 
@@ -338,6 +366,7 @@ object Func25 extends ScalarFunction {
     Types.ROW(Types.INT, Types.INT)
 }
 
+@SerialVersionUID(1L)
 object Func24 extends ScalarFunction {
   def eval(a: String, b: Integer, c: JLong, d: String): Row = {
     Row.of(a, Integer.valueOf(b + 1), c, d)
@@ -350,6 +379,7 @@ object Func24 extends ScalarFunction {
 /**
   * A scalar function that always returns TRUE if opened correctly.
   */
+@SerialVersionUID(1L)
 class FuncWithOpen extends ScalarFunction {
 
   private var permitted: Boolean = false
@@ -367,6 +397,7 @@ class FuncWithOpen extends ScalarFunction {
   }
 }
 
+@SerialVersionUID(1L)
 class SplitUDF(deterministic: Boolean) extends ScalarFunction {
   def eval(x: String, sep: String, index: Int): String = {
     val splits = StringUtils.splitByWholeSeparator(x, sep)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/table/CalcTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/table/CalcTest.scala
index 354e0d1..ed79b00 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/table/CalcTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/table/CalcTest.scala
@@ -183,6 +183,7 @@ object CalcTest {
 
   case class TestCaseClass(my: String, clazz: Int)
 
+  @SerialVersionUID(1L)
   object giveMeCaseClass extends ScalarFunction {
     def eval(): TestCaseClass = {
       TestCaseClass("hello", 42)
@@ -193,6 +194,7 @@ object CalcTest {
     }
   }
 
+  @SerialVersionUID(1L)
   object MyHashCode extends ScalarFunction {
     def eval(s: String): Int = s.hashCode()
   }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/table/ColumnFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/table/ColumnFunctionsTest.scala
index 90cc7a5..0c689e5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/table/ColumnFunctionsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/table/ColumnFunctionsTest.scala
@@ -44,6 +44,7 @@ class ColumnFunctionsTest extends TableTestBase {
   }
 }
 
+@SerialVersionUID(1L)
 object TestFunc extends ScalarFunction {
   def eval(a: Double, b: Long): Double = {
     a
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/table/JoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/table/JoinTest.scala
index ca74b0e7..d1a2a01 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/table/JoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/table/JoinTest.scala
@@ -203,6 +203,7 @@ class JoinTest extends TableTestBase {
 
 object JoinTest {
 
+  @SerialVersionUID(1L)
   object Merger extends ScalarFunction {
     def eval(f0: Int, f1: Int): Int = {
       f0 + f1
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/ModifiedMonotonicityTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/ModifiedMonotonicityTest.scala
index 66705e0..dd11055 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/ModifiedMonotonicityTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/ModifiedMonotonicityTest.scala
@@ -264,6 +264,7 @@ class ModifiedMonotonicityTest extends TableTestBase {
   }
 }
 
+@SerialVersionUID(1L)
 class Func1 extends ScalarFunction {
   def eval(str: String): String = {
     s"$str"
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
index d01616a..1666fa0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
@@ -425,57 +425,67 @@ class TestInvalidTemporalTable private(
   override def isAsyncEnabled: Boolean = async
 }
 
+@SerialVersionUID(1L)
 class InvalidTableFunctionResultType extends TableFunction[String] {
   @varargs
   def eval(obj: AnyRef*): Unit = {
   }
 }
 
+@SerialVersionUID(1L)
 class InvalidTableFunctionEvalSignature1 extends TableFunction[BaseRow] {
   def eval(a: Integer, b: String, c: LocalDateTime): Unit = {
   }
 }
 
+@SerialVersionUID(1L)
 class ValidTableFunction extends TableFunction[BaseRow] {
   @varargs
   def eval(obj: AnyRef*): Unit = {
   }
 }
 
+@SerialVersionUID(1L)
 class ValidTableFunction2 extends TableFunction[Row] {
   def eval(a: Integer, b: String, c: LocalDateTime): Unit = {
   }
 }
 
+@SerialVersionUID(1L)
 class InvalidAsyncTableFunctionResultType extends AsyncTableFunction[Row] {
   @varargs
   def eval(obj: AnyRef*): Unit = {
   }
 }
 
+@SerialVersionUID(1L)
 class InvalidAsyncTableFunctionEvalSignature1 extends AsyncTableFunction[BaseRow] {
   def eval(a: Integer, b: BinaryString, c: LocalDateTime): Unit = {
   }
 }
 
+@SerialVersionUID(1L)
 class InvalidAsyncTableFunctionEvalSignature2 extends AsyncTableFunction[BaseRow] {
   def eval(resultFuture: CompletableFuture[JCollection[BaseRow]],
     a: Integer, b: String,  c: LocalDateTime): Unit = {
   }
 }
 
+@SerialVersionUID(1L)
 class InvalidAsyncTableFunctionEvalSignature3 extends AsyncTableFunction[BaseRow] {
   def eval(resultFuture: ResultFuture[BaseRow],
     a: Integer, b: BinaryString,  c: JLong): Unit = {
   }
 }
 
+@SerialVersionUID(1L)
 class ValidAsyncTableFunction extends AsyncTableFunction[BaseRow] {
   @varargs
   def eval(resultFuture: CompletableFuture[JCollection[BaseRow]], objs: AnyRef*): Unit = {
   }
 }
 
+@SerialVersionUID(1L)
 class ValidAsyncTableFunction2 extends AsyncTableFunction[BaseRow] {
   def eval(resultFuture: CompletableFuture[JCollection[BaseRow]],
     a: Integer, b: BinaryString, c: JLong): Unit = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/table/ColumnFunctionsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/table/ColumnFunctionsTest.scala
index f3a11dc..b809337 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/table/ColumnFunctionsTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/table/ColumnFunctionsTest.scala
@@ -221,6 +221,7 @@ class ColumnFunctionsTest extends TableTestBase {
   }
 }
 
+@SerialVersionUID(1L)
 object TestFunc extends ScalarFunction {
   def eval(a: Double, b: Long): Double = {
     a
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/CorrelateITCase.scala
index c48b0bb..77c82a8 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/CorrelateITCase.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.runtime.batch.sql
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO
-import org.apache.flink.api.common.typeinfo.{LocalTimeTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{LocalTimeTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.{PojoField, PojoTypeInfo, RowTypeInfo, TypeExtractor}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.typeutils.Types
@@ -300,6 +300,7 @@ class CorrelateITCase extends BatchTestBase {
 //  }
 }
 
+@SerialVersionUID(1L)
 object StringUdFunc extends ScalarFunction {
   def eval(s: String): String = s
 }
@@ -320,6 +321,7 @@ object TableFunctionITCase {
   )
 }
 
+@SerialVersionUID(1L)
 class MyPojoTableFunc extends TableFunction[Int] {
   def eval(s: MyPojo): Unit = collect(s.f2)
 
@@ -331,6 +333,7 @@ class MyPojoTableFunc extends TableFunction[Int] {
   }
 }
 
+@SerialVersionUID(1L)
 class MyToPojoTableFunc extends TableFunction[MyPojo] {
   def eval(s: Int): Unit = collect(new MyPojo(s, s))
 
@@ -342,6 +345,7 @@ class MyToPojoTableFunc extends TableFunction[MyPojo] {
   }
 }
 
+@SerialVersionUID(1L)
 class GenericTableFunc[T](t: TypeInformation[T]) extends TableFunction[T] {
   def eval(s: Int): Unit = {
     if (t == Types.STRING) {
@@ -356,6 +360,7 @@ class GenericTableFunc[T](t: TypeInformation[T]) extends TableFunction[T] {
   override def getResultType: TypeInformation[T] = t
 }
 
+@SerialVersionUID(1L)
 class BinaryStringTableFunc extends TableFunction[Row] {
   def eval(s: BinaryString, cons: BinaryString): Unit = collect(Row.of(s, cons))
   override def getResultType: TypeInformation[Row] = {
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index 5aa8f17..4f382f0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -652,10 +652,12 @@ class CalcITCase extends BatchTestBase {
 
 }
 
+@SerialVersionUID(1L)
 object HashCode extends ScalarFunction {
   def eval(s: String): Int = s.hashCode
 }
 
+@SerialVersionUID(1L)
 object OldHashCode extends ScalarFunction {
   def eval(s: String): Int = -1
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
index b3e0c0e..76205d9 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
@@ -737,12 +737,14 @@ class MatchRecognizeITCase(backend: StateBackendMode) extends StreamingWithState
   }
 }
 
+@SerialVersionUID(1L)
 class ToMillis extends ScalarFunction {
   def eval(t: Timestamp): Long = {
     t.toInstant.toEpochMilli + TimeZone.getDefault.getOffset(t.toInstant.toEpochMilli)
   }
 }
 
+@SerialVersionUID(1L)
 private class PrefixingScalarFunc extends ScalarFunction {
 
   private var prefix = "ERROR_VALUE"
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
index adeaa8c..83fe5dc 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
@@ -19,21 +19,21 @@
 package org.apache.flink.table.runtime.stream.table
 
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{ExecutionConfigOptions, TableException}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.utils.{Func1, Func13, Func23, Func24, Func25, RichFunc1, RichFunc2}
 import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.table.runtime.utils.{StreamingWithStateTestBase, TestingAppendSink, TestingRetractSink, UserDefinedFunctionTestUtils}
 import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
 import org.apache.flink.table.runtime.utils.TestData._
+import org.apache.flink.table.runtime.utils.{StreamingWithStateTestBase, TestingAppendSink, TestingRetractSink, UserDefinedFunctionTestUtils}
 import org.apache.flink.types.Row
 
-import scala.collection.{Seq, mutable}
 import org.junit.Assert._
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
+import scala.collection.{Seq, mutable}
+
 @RunWith(classOf[Parameterized])
 class CalcITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) {
 
@@ -452,7 +452,7 @@ class CalcITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
   }
 }
 
-
+@SerialVersionUID(1L)
 object TestUDFLength extends ScalarFunction {
 
   // testing eval function with throws clause
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
index 2a65683..39f57cd 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
@@ -219,6 +219,7 @@ object InMemoryLookupableTableSource {
   /**
     * An async lookup function which find matched rows with the given fields.
     */
+  @SerialVersionUID(1L)
   private class InMemoryAsyncLookupFunction(
       data: Map[Row, List[Row]],
       resourceCounter: AtomicInteger,
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/UserDefinedFunctionTestUtils.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/UserDefinedFunctionTestUtils.scala
index 79f61e0..c9ca0e7 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/UserDefinedFunctionTestUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/UserDefinedFunctionTestUtils.scala
@@ -181,50 +181,62 @@ object UserDefinedFunctionTestUtils {
   // ScalarFunctions
   // ------------------------------------------------------------------------------------
 
+  @SerialVersionUID(1L)
   object MyHashCode extends ScalarFunction {
     def eval(s: String): Int = s.hashCode()
   }
 
+  @SerialVersionUID(1L)
   object OldHashCode extends ScalarFunction {
     def eval(s: String): Int = -1
   }
 
+  @SerialVersionUID(1L)
   object StringFunction extends ScalarFunction {
     def eval(s: String): String = s
   }
 
+  @SerialVersionUID(1L)
   object MyStringFunc extends ScalarFunction {
     def eval(s: String): String = s + "haha"
   }
 
+  @SerialVersionUID(1L)
   object BinaryStringFunction extends ScalarFunction {
     def eval(s: BinaryString): BinaryString = s
   }
 
+  @SerialVersionUID(1L)
   object DateFunction extends ScalarFunction {
     def eval(d: Date): String = d.toString
   }
 
+  @SerialVersionUID(1L)
   object LocalDateFunction extends ScalarFunction {
     def eval(d: LocalDate): String = d.toString
   }
 
+  @SerialVersionUID(1L)
   object TimestampFunction extends ScalarFunction {
     def eval(t: java.sql.Timestamp): String = t.toString
   }
 
+  @SerialVersionUID(1L)
   object DateTimeFunction extends ScalarFunction {
     def eval(t: LocalDateTime): String = t.toString
   }
 
+  @SerialVersionUID(1L)
   object TimeFunction extends ScalarFunction {
     def eval(t: java.sql.Time): String = t.toString
   }
 
+  @SerialVersionUID(1L)
   object LocalTimeFunction extends ScalarFunction {
     def eval(t: LocalTime): String = t.toString
   }
 
+  @SerialVersionUID(1L)
   object InstantFunction extends ScalarFunction {
     def eval(t: Instant): Instant = t
 
@@ -232,6 +244,7 @@ object UserDefinedFunctionTestUtils {
   }
 
   // Understand type: Row wrapped as TypeInfoWrappedDataType.
+  @SerialVersionUID(1L)
   object RowFunc extends ScalarFunction {
     def eval(s: String): Row = Row.of(s)
 
@@ -239,11 +252,13 @@ object UserDefinedFunctionTestUtils {
       new RowTypeInfo(Types.STRING)
   }
 
+  @SerialVersionUID(1L)
   object RowToStrFunc extends ScalarFunction {
     def eval(s: BaseRow): String = s.getString(0).toString
   }
 
   // generic.
+  @SerialVersionUID(1L)
   object ListFunc extends ScalarFunction {
     def eval(s: String): java.util.List[String] = util.Arrays.asList(s)
 
@@ -252,6 +267,7 @@ object UserDefinedFunctionTestUtils {
   }
 
   // internal but wrapped as TypeInfoWrappedDataType.
+  @SerialVersionUID(1L)
   object StringFunc extends ScalarFunction {
     def eval(s: String): String = s
 
@@ -259,6 +275,7 @@ object UserDefinedFunctionTestUtils {
       Types.STRING
   }
 
+  @SerialVersionUID(1L)
   object MyPojoFunc extends ScalarFunction {
     def eval(s: MyPojo): Int = s.f2
 
@@ -266,6 +283,7 @@ object UserDefinedFunctionTestUtils {
       Array(MyToPojoFunc.getResultType(signature))
   }
 
+  @SerialVersionUID(1L)
   object MyToPojoFunc extends ScalarFunction {
     def eval(s: Int): MyPojo = new MyPojo(s, s)
 
@@ -277,6 +295,7 @@ object UserDefinedFunctionTestUtils {
     }
   }
 
+  @SerialVersionUID(1L)
   object ToCompositeObj extends ScalarFunction {
     def eval(id: Int, name: String, age: Int): CompositeObj = {
       CompositeObj(id, name, age, "0.0")
@@ -287,6 +306,7 @@ object UserDefinedFunctionTestUtils {
     }
   }
 
+  @SerialVersionUID(1L)
   object TestWrapperUdf extends ScalarFunction {
     def eval(id: Int): Int = {
       id
@@ -297,6 +317,7 @@ object UserDefinedFunctionTestUtils {
     }
   }
 
+  @SerialVersionUID(1L)
   class TestAddWithOpen extends ScalarFunction {
 
     var isOpened: Boolean = false
@@ -328,18 +349,21 @@ object UserDefinedFunctionTestUtils {
     val aliveCounter = new AtomicInteger(0)
   }
 
+  @SerialVersionUID(1L)
   object TestMod extends ScalarFunction {
     def eval(src: Long, mod: Int): Long = {
       src % mod
     }
   }
 
+  @SerialVersionUID(1L)
   object TestExceptionThrown extends ScalarFunction {
     def eval(src: String): Int = {
       throw new NumberFormatException("Cannot parse this input.")
     }
   }
 
+  @SerialVersionUID(1L)
   class ToMillis extends ScalarFunction {
     def eval(t: Timestamp): Long = {
       t.toInstant.toEpochMilli + TimeZone.getDefault.getOffset(t.toInstant.toEpochMilli)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/UserDefinedTableFunctions.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/UserDefinedTableFunctions.scala
index f531fad..54b2987 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/UserDefinedTableFunctions.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/UserDefinedTableFunctions.scala
@@ -34,6 +34,7 @@ import scala.annotation.varargs
 
 case class SimpleUser(name: String, age: Int)
 
+@SerialVersionUID(1L)
 class TableFunc0 extends TableFunction[SimpleUser] {
   // make sure input element's format is "<string>#<int>"
   def eval(user: String): Unit = {
@@ -44,6 +45,7 @@ class TableFunc0 extends TableFunction[SimpleUser] {
   }
 }
 
+@SerialVersionUID(1L)
 class TableFunc1 extends TableFunction[String] {
   def eval(str: String): Unit = {
     if (str.contains("#")){
@@ -58,6 +60,7 @@ class TableFunc1 extends TableFunction[String] {
   }
 }
 
+@SerialVersionUID(1L)
 class TableFunc2 extends TableFunction[Row] {
   def eval(str: String): Unit = {
     if (str.contains("#")) {
@@ -74,6 +77,7 @@ class TableFunc2 extends TableFunction[Row] {
     new RowTypeInfo(Types.STRING, Types.INT)
 }
 
+@SerialVersionUID(1L)
 class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[SimpleUser] {
 
   def this(data: String) {
@@ -109,6 +113,7 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[
   }
 }
 
+@SerialVersionUID(1L)
 class TableFunc5 extends TableFunction[Tuple2[String, Int]] {
   def eval(bytes: Array[Byte]) {
     if (null != bytes) {
@@ -361,6 +366,7 @@ class TableFunc5 extends TableFunction[Tuple2[String, Int]] {
 //  }
 //}
 
+@SerialVersionUID(1L)
 class TableFunc4 extends TableFunction[Row] {
   def eval(b: Byte, s: Short, f: Float): Unit = {
     collect(Row.of("Byte=" + b, "Short=" + s, "Float=" + f))
@@ -371,6 +377,7 @@ class TableFunc4 extends TableFunction[Row] {
   }
 }
 
+@SerialVersionUID(1L)
 class TableFunc6 extends TableFunction[Row] {
   def eval(row: Row): Unit = {
     collect(row)
@@ -384,6 +391,7 @@ class TableFunc6 extends TableFunction[Row] {
   }
 }
 
+@SerialVersionUID(1L)
 class TableFunc7 extends TableFunction[Row] {
 
   def eval(row: Row): Unit = {
@@ -393,6 +401,7 @@ class TableFunc7 extends TableFunction[Row] {
   }
 }
 
+@SerialVersionUID(1L)
 class RF extends ScalarFunction {
 
   def eval(x: Int): java.util.List[Row] = {
@@ -400,6 +409,7 @@ class RF extends ScalarFunction {
   }
 }
 
+@SerialVersionUID(1L)
 class VarArgsFunc0 extends TableFunction[String] {
   @varargs
   def eval(str: String*): Unit = {
@@ -407,6 +417,7 @@ class VarArgsFunc0 extends TableFunction[String] {
   }
 }
 
+@SerialVersionUID(1L)
 class HierarchyTableFunction extends SplittableTableFunction[Boolean, Integer] {
   def eval(user: String) {
     if (user.contains("#")) {
@@ -419,6 +430,7 @@ class HierarchyTableFunction extends SplittableTableFunction[Boolean, Integer] {
 
 abstract class SplittableTableFunction[A, B] extends TableFunction[Tuple3[String, A, B]] {}
 
+@SerialVersionUID(1L)
 class PojoTableFunc extends TableFunction[PojoUser] {
   def eval(user: String) {
     if (user.contains("#")) {
@@ -445,6 +457,7 @@ class PojoUser() {
 
 
 // this is used to check whether scala object is forbidden
+@SerialVersionUID(1L)
 object ObjectTableFunction extends TableFunction[Integer] {
   def eval(a: Int, b: Int): Unit = {
     collect(a)
@@ -452,6 +465,7 @@ object ObjectTableFunction extends TableFunction[Integer] {
   }
 }
 
+@SerialVersionUID(1L)
 class RichTableFunc0 extends TableFunction[String] {
   var openCalled = false
   var closeCalled = false
@@ -486,6 +500,7 @@ class RichTableFunc0 extends TableFunction[String] {
   }
 }
 
+@SerialVersionUID(1L)
 class RichTableFunc1 extends TableFunction[String] {
   var separator: Option[String] = None