You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/02/25 08:51:18 UTC

[flink] 01/02: [FLINK-26283][table-planner] Harden AggregateCall serialization in JSON plan

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5c3c90d29024370438d41255aa2650758d9c757f
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Feb 21 15:51:37 2022 +0100

    [FLINK-26283][table-planner] Harden AggregateCall serialization in JSON plan
    
    This closes #18872.
---
 .../exec/serde/AggregateCallJsonDeserializer.java  | 143 +++------------------
 .../exec/serde/AggregateCallJsonSerializer.java    |  73 +++--------
 .../nodes/exec/serde/RexNodeJsonDeserializer.java  |   6 +-
 .../nodes/exec/serde/RexNodeJsonSerializer.java    |   5 +-
 .../ExpandJsonPlanTest_jsonplan/testExpand.out     |  25 +---
 ...tDistinctAggCalls[isMiniBatchEnabled=false].out |  39 ++----
 ...stDistinctAggCalls[isMiniBatchEnabled=true].out |  78 +++--------
 ...gCallsWithGroupBy[isMiniBatchEnabled=false].out |  19 +--
 ...ggCallsWithGroupBy[isMiniBatchEnabled=true].out |  38 ++----
 ...AggWithoutGroupBy[isMiniBatchEnabled=false].out |  25 +---
 ...eAggWithoutGroupBy[isMiniBatchEnabled=true].out |  50 ++-----
 ...erDefinedAggCalls[isMiniBatchEnabled=false].out |  37 +-----
 ...serDefinedAggCalls[isMiniBatchEnabled=true].out |  37 +-----
 .../testEventTimeHopWindow.out                     |  13 +-
 .../testEventTimeSessionWindow.out                 |  13 +-
 .../testEventTimeTumbleWindow.out                  |  30 +----
 .../testProcTimeHopWindow.out                      |   6 +-
 .../testProcTimeSessionWindow.out                  |   6 +-
 .../testProcTimeTumbleWindow.out                   |   7 +-
 .../testIncrementalAggregate.out                   |  20 +--
 ...lAggregateWithSumCountDistinctAndRetraction.out |  84 +++---------
 .../testInnerJoinWithPk.out                        |  12 +-
 .../testProcTimeBoundedNonPartitionedRangeOver.out |   7 +-
 .../testProcTimeBoundedPartitionedRangeOver.out    |  13 +-
 ...undedPartitionedRowsOverWithBuiltinProctime.out |  19 +--
 .../testProcTimeUnboundedPartitionedRangeOver.out  |  13 +-
 ...stProctimeBoundedDistinctPartitionedRowOver.out |  13 +-
 ...edDistinctWithNonDistinctPartitionedRowOver.out |  33 ++---
 .../testRowTimeBoundedPartitionedRowsOver.out      |   7 +-
 .../testDistinctSplitEnabled.out                   |  76 +++--------
 .../testEventTimeCumulateWindow.out                |  26 +---
 .../testEventTimeCumulateWindowWithOffset.out      |  26 +---
 .../testEventTimeHopWindow.out                     |  26 +---
 .../testEventTimeHopWindowWithOffset.out           |  26 +---
 .../testEventTimeTumbleWindow.out                  |  60 ++-------
 .../testEventTimeTumbleWindowWithOffset.out        |  60 ++-------
 .../testProcTimeCumulateWindow.out                 |   7 +-
 .../testProcTimeHopWindow.out                      |   6 +-
 .../testProcTimeTumbleWindow.out                   |   7 +-
 .../testEventTimeTumbleWindow.out                  |  56 +++-----
 40 files changed, 268 insertions(+), 979 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java
index 69c1cce..28eef53 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java
@@ -18,24 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.catalog.ContextResolvedFunction;
-import org.apache.flink.table.catalog.DataTypeFactory;
-import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.functions.FunctionIdentifier;
-import org.apache.flink.table.functions.FunctionKind;
-import org.apache.flink.table.functions.ImperativeAggregateFunction;
-import org.apache.flink.table.functions.UserDefinedFunctionHelper;
-import org.apache.flink.table.module.CoreModule;
-import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction;
-import org.apache.flink.table.planner.functions.utils.AggSqlFunction;
-import org.apache.flink.table.types.inference.TypeInference;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.table.utils.EncodingUtils;
-import org.apache.flink.util.Preconditions;
-
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
@@ -44,33 +27,17 @@ import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.SqlSyntax;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.validate.SqlNameMatchers;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_AGG_FUNCTION;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_APPROXIMATE;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_ARG_LIST;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_BRIDGING;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_BUILT_IN;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_DISPLAY_NAME;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_DISTINCT;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_FILTER_ARG;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_FUNCTION_KIND;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_IGNORE_NULLS;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_INSTANCE;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_KIND;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_NAME;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_REQUIRES_OVER;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_SYNTAX;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_TYPE;
 
 /**
@@ -86,24 +53,26 @@ public class AggregateCallJsonDeserializer extends StdDeserializer<AggregateCall
 
     @Override
     public AggregateCall deserialize(JsonParser jsonParser, DeserializationContext ctx)
-            throws IOException, JsonProcessingException {
-        JsonNode jsonNode = jsonParser.readValueAsTree();
-        JsonNode aggFunNode = jsonNode.get(FIELD_NAME_AGG_FUNCTION);
-        SqlAggFunction aggFunction = toSqlAggFunction(aggFunNode, SerdeContext.get(ctx));
+            throws IOException {
+        final JsonNode jsonNode = jsonParser.readValueAsTree();
+        final SerdeContext serdeContext = SerdeContext.get(ctx);
 
-        List<Integer> argList = new ArrayList<>();
-        JsonNode argListNode = jsonNode.get(FIELD_NAME_ARG_LIST);
+        final String name = jsonNode.required(FIELD_NAME_NAME).asText();
+        final SqlAggFunction aggFunction =
+                (SqlAggFunction)
+                        RexNodeJsonDeserializer.deserializeSqlOperator(jsonNode, serdeContext);
+        final List<Integer> argList = new ArrayList<>();
+        final JsonNode argListNode = jsonNode.required(FIELD_NAME_ARG_LIST);
         for (JsonNode argNode : argListNode) {
             argList.add(argNode.intValue());
         }
-        int filterArg = jsonNode.get(FIELD_NAME_FILTER_ARG).intValue();
-        boolean distinct = jsonNode.get(FIELD_NAME_DISTINCT).asBoolean();
-        boolean approximate = jsonNode.get(FIELD_NAME_APPROXIMATE).asBoolean();
-        boolean ignoreNulls = jsonNode.get(FIELD_NAME_IGNORE_NULLS).asBoolean();
-        JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
-        RelDataType relDataType =
-                ctx.readValue(typeNode.traverse(jsonParser.getCodec()), RelDataType.class);
-        String name = jsonNode.get(FIELD_NAME_NAME).asText();
+        final int filterArg = jsonNode.required(FIELD_NAME_FILTER_ARG).asInt();
+        final boolean distinct = jsonNode.required(FIELD_NAME_DISTINCT).asBoolean();
+        final boolean approximate = jsonNode.required(FIELD_NAME_APPROXIMATE).asBoolean();
+        final boolean ignoreNulls = jsonNode.required(FIELD_NAME_IGNORE_NULLS).asBoolean();
+        final RelDataType relDataType =
+                RelDataTypeJsonDeserializer.deserialize(
+                        jsonNode.required(FIELD_NAME_TYPE), serdeContext);
 
         return AggregateCall.create(
                 aggFunction,
@@ -116,84 +85,4 @@ public class AggregateCallJsonDeserializer extends StdDeserializer<AggregateCall
                 relDataType,
                 name);
     }
-
-    private SqlAggFunction toSqlAggFunction(JsonNode jsonNode, SerdeContext ctx)
-            throws IOException {
-        String name = jsonNode.get(FIELD_NAME_NAME).asText();
-        SqlKind sqlKind = SqlKind.valueOf(jsonNode.get(FIELD_NAME_KIND).asText());
-        SqlSyntax sqlSyntax = SqlSyntax.valueOf(jsonNode.get(FIELD_NAME_SYNTAX).asText());
-        List<SqlOperator> operators = new ArrayList<>();
-        ctx.getOperatorTable()
-                .lookupOperatorOverloads(
-                        new SqlIdentifier(name, new SqlParserPos(0, 0)),
-                        null, // category
-                        sqlSyntax,
-                        operators,
-                        SqlNameMatchers.liberal());
-        for (SqlOperator operator : operators) {
-            // in case different operator has the same kind, check with both name and kind.
-            if (operator.kind == sqlKind) {
-                return (SqlAggFunction) operator;
-            }
-        }
-
-        DataTypeFactory dataTypeFactory =
-                ctx.getFlinkContext().getCatalogManager().getDataTypeFactory();
-
-        // built-in function
-        // TODO supports other module's built-in function
-        if (jsonNode.has(FIELD_NAME_BUILT_IN) && jsonNode.get(FIELD_NAME_BUILT_IN).booleanValue()) {
-            Optional<FunctionDefinition> definition =
-                    CoreModule.INSTANCE.getFunctionDefinition(name);
-            Preconditions.checkArgument(definition.isPresent());
-            TypeInference typeInference = definition.get().getTypeInference(dataTypeFactory);
-            return BridgingSqlAggFunction.of(
-                    dataTypeFactory,
-                    ctx.getTypeFactory(),
-                    sqlKind,
-                    ContextResolvedFunction.permanent(
-                            FunctionIdentifier.of(name), definition.get()),
-                    typeInference);
-        }
-
-        if (jsonNode.has(FIELD_NAME_FUNCTION_KIND) && jsonNode.has(FIELD_NAME_INSTANCE)) {
-            FunctionKind functionKind =
-                    FunctionKind.valueOf(
-                            jsonNode.get(FIELD_NAME_FUNCTION_KIND).asText().toUpperCase());
-            String instanceStr = jsonNode.get(FIELD_NAME_INSTANCE).asText();
-            if (functionKind != FunctionKind.AGGREGATE) {
-                throw new TableException("Unknown function kind: " + functionKind);
-            }
-            if (jsonNode.has(FIELD_NAME_BRIDGING)
-                    && jsonNode.get(FIELD_NAME_BRIDGING).booleanValue()) {
-                FunctionDefinition definition =
-                        EncodingUtils.decodeStringToObject(instanceStr, ctx.getClassLoader());
-                TypeInference typeInference = definition.getTypeInference(dataTypeFactory);
-                return BridgingSqlAggFunction.of(
-                        dataTypeFactory,
-                        ctx.getTypeFactory(),
-                        sqlKind,
-                        ContextResolvedFunction.permanent(FunctionIdentifier.of(name), definition),
-                        typeInference);
-            } else {
-                String displayName = jsonNode.get(FIELD_NAME_DISPLAY_NAME).asText();
-                boolean requiresOver = jsonNode.get(FIELD_NAME_REQUIRES_OVER).booleanValue();
-                ImperativeAggregateFunction<?, ?> function =
-                        EncodingUtils.decodeStringToObject(instanceStr, ctx.getClassLoader());
-                return AggSqlFunction.apply(
-                        FunctionIdentifier.of(name),
-                        displayName,
-                        function,
-                        TypeConversions.fromLegacyInfoToDataType(
-                                UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(
-                                        function)),
-                        TypeConversions.fromLegacyInfoToDataType(
-                                UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(
-                                        function)),
-                        ctx.getTypeFactory(),
-                        requiresOver);
-            }
-        }
-        throw new TableException("Unknown operator: " + jsonNode.toPrettyString());
-    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
index 27f2549..35e3ac9 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
@@ -18,47 +18,35 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
-import org.apache.flink.table.functions.BuiltInFunctionDefinition;
-import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.functions.FunctionKind;
-import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction;
-import org.apache.flink.table.planner.functions.utils.AggSqlFunction;
-import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
 
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.sql.SqlAggFunction;
 
 import java.io.IOException;
 
 /**
- * JSON serializer for {@link AggregateCall}. refer to {@link AggregateCallJsonDeserializer} for
- * deserializer.
+ * JSON serializer for {@link AggregateCall}.
+ *
+ * @see AggregateCallJsonDeserializer for the reverse operation
  */
+@Internal
 public class AggregateCallJsonSerializer extends StdSerializer<AggregateCall> {
     private static final long serialVersionUID = 1L;
 
-    public static final String FIELD_NAME_KIND = "kind";
-    public static final String FIELD_NAME_TYPE = "type";
     public static final String FIELD_NAME_NAME = "name";
-
-    public static final String FIELD_NAME_AGG_FUNCTION = "aggFunction";
-    public static final String FIELD_NAME_INSTANCE = "instance";
-    public static final String FIELD_NAME_SYNTAX = "syntax";
-    public static final String FIELD_NAME_DISPLAY_NAME = "displayName";
-    public static final String FIELD_NAME_FUNCTION_KIND = "functionKind";
-    public static final String FIELD_NAME_BRIDGING = "bridging";
-    public static final String FIELD_NAME_BUILT_IN = "builtIn";
-    public static final String FIELD_NAME_REQUIRES_OVER = "requiresOver";
-
     public static final String FIELD_NAME_ARG_LIST = "argList";
     public static final String FIELD_NAME_FILTER_ARG = "filterArg";
     public static final String FIELD_NAME_DISTINCT = "distinct";
     public static final String FIELD_NAME_APPROXIMATE = "approximate";
     public static final String FIELD_NAME_IGNORE_NULLS = "ignoreNulls";
+    public static final String FIELD_NAME_TYPE = "type";
 
     public AggregateCallJsonSerializer() {
         super(AggregateCall.class);
@@ -70,9 +58,17 @@ public class AggregateCallJsonSerializer extends StdSerializer<AggregateCall> {
             JsonGenerator jsonGenerator,
             SerializerProvider serializerProvider)
             throws IOException {
+        final ReadableConfig config = SerdeContext.get(serializerProvider).getConfiguration();
+        final CatalogPlanCompilation compilationStrategy =
+                config.get(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS);
+
         jsonGenerator.writeStartObject();
         jsonGenerator.writeStringField(FIELD_NAME_NAME, aggCall.getName());
-        serialize(aggCall.getAggregation(), jsonGenerator);
+        RexNodeJsonSerializer.serializeSqlOperator(
+                aggCall.getAggregation(),
+                jsonGenerator,
+                serializerProvider,
+                compilationStrategy == CatalogPlanCompilation.ALL);
         jsonGenerator.writeFieldName(FIELD_NAME_ARG_LIST);
         jsonGenerator.writeStartArray();
         for (int arg : aggCall.getArgList()) {
@@ -86,37 +82,4 @@ public class AggregateCallJsonSerializer extends StdSerializer<AggregateCall> {
         serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, aggCall.getType(), jsonGenerator);
         jsonGenerator.writeEndObject();
     }
-
-    private void serialize(SqlAggFunction operator, JsonGenerator gen) throws IOException {
-        gen.writeFieldName(FIELD_NAME_AGG_FUNCTION);
-        gen.writeStartObject();
-        gen.writeStringField(FIELD_NAME_NAME, operator.getName());
-        gen.writeStringField(FIELD_NAME_KIND, operator.kind.name());
-        gen.writeStringField(FIELD_NAME_SYNTAX, operator.getSyntax().name());
-        // TODO if a udf is registered with class name, class name is recorded enough
-        if (operator instanceof AggSqlFunction) {
-            AggSqlFunction aggSqlFunc = (AggSqlFunction) operator;
-            gen.writeStringField(FIELD_NAME_DISPLAY_NAME, aggSqlFunc.displayName());
-            gen.writeStringField(FIELD_NAME_FUNCTION_KIND, FunctionKind.AGGREGATE.name());
-            gen.writeBooleanField(FIELD_NAME_REQUIRES_OVER, aggSqlFunc.requiresOver());
-            gen.writeStringField(
-                    FIELD_NAME_INSTANCE,
-                    EncodingUtils.encodeObjectToString(aggSqlFunc.aggregateFunction()));
-        } else if (operator instanceof BridgingSqlAggFunction) {
-            BridgingSqlAggFunction bridgingSqlAggFunc = (BridgingSqlAggFunction) operator;
-            FunctionDefinition functionDefinition = bridgingSqlAggFunc.getDefinition();
-            if (functionDefinition instanceof BuiltInFunctionDefinition) {
-                // just record the flag, we can find it by name
-                gen.writeBooleanField(FIELD_NAME_BUILT_IN, true);
-            } else {
-                assert functionDefinition.getKind() == FunctionKind.AGGREGATE;
-                gen.writeStringField(FIELD_NAME_FUNCTION_KIND, FunctionKind.AGGREGATE.name());
-                gen.writeStringField(
-                        FIELD_NAME_INSTANCE,
-                        EncodingUtils.encodeObjectToString(functionDefinition));
-                gen.writeBooleanField(FIELD_NAME_BRIDGING, true);
-            }
-        }
-        gen.writeEndObject();
-    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
index 9c23498..ff9e322 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
@@ -317,8 +317,10 @@ public class RexNodeJsonDeserializer extends StdDeserializer<RexNode> {
         return serdeContext.getRexBuilder().makeCall(callType, operator, rexOperands);
     }
 
-    private static SqlOperator deserializeSqlOperator(
-            JsonNode jsonNode, SerdeContext serdeContext) {
+    // --------------------------------------------------------------------------------------------
+
+    /** Logic shared with {@link AggregateCallJsonDeserializer}. */
+    static SqlOperator deserializeSqlOperator(JsonNode jsonNode, SerdeContext serdeContext) {
         final SqlSyntax syntax;
         if (jsonNode.has(FIELD_NAME_SYNTAX)) {
             syntax =
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
index bd5a6e8..d38f08a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
@@ -359,7 +359,10 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
         gen.writeEndObject();
     }
 
-    private static void serializeSqlOperator(
+    // --------------------------------------------------------------------------------------------
+
+    /** Logic shared with {@link AggregateCallJsonSerializer}. */
+    static void serializeSqlOperator(
             SqlOperator operator,
             JsonGenerator gen,
             SerializerProvider serializerProvider,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest_jsonplan/testExpand.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest_jsonplan/testExpand.out
index 3925e50..04d059e 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest_jsonplan/testExpand.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest_jsonplan/testExpand.out
@@ -235,11 +235,8 @@
     "grouping" : [ 0, 3, 4 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : 5,
       "distinct" : true,
@@ -248,11 +245,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "FIRST_VALUE",
-        "kind" : "FIRST_VALUE",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$FIRST_VALUE$1",
       "argList" : [ 2 ],
       "filterArg" : 6,
       "distinct" : false,
@@ -291,11 +284,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "$SUM0",
-        "kind" : "SUM0",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$$SUM0$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -304,11 +293,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "FIRST_VALUE",
-        "kind" : "FIRST_VALUE",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$FIRST_VALUE$1",
       "argList" : [ 4 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out
index 23d50e2..5a69103 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out
@@ -97,11 +97,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt_a1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : 2,
       "distinct" : true,
@@ -110,11 +107,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "cnt_a2",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -123,11 +117,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "sum_a",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -136,11 +126,7 @@
       "type" : "BIGINT"
     }, {
       "name" : "sum_b",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -149,11 +135,7 @@
       "type" : "INT NOT NULL"
     }, {
       "name" : "avg_b",
-      "aggFunction" : {
-        "name" : "AVG",
-        "kind" : "AVG",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$AVG$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -162,11 +144,8 @@
       "type" : "INT NOT NULL"
     }, {
       "name" : "cnt_d",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 4 ],
       "filterArg" : -1,
       "distinct" : true,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
index 37b6dad..d56ae50 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
@@ -100,11 +100,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt_a1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : 2,
       "distinct" : true,
@@ -113,11 +110,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "cnt_a2",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -126,11 +120,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "sum_a",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -139,11 +129,7 @@
       "type" : "BIGINT"
     }, {
       "name" : "sum_b",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -152,11 +138,7 @@
       "type" : "INT NOT NULL"
     }, {
       "name" : "avg_b",
-      "aggFunction" : {
-        "name" : "AVG",
-        "kind" : "AVG",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$AVG$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -165,11 +147,8 @@
       "type" : "INT NOT NULL"
     }, {
       "name" : "cnt_d",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 4 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -361,11 +340,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt_a1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : 2,
       "distinct" : true,
@@ -374,11 +350,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "cnt_a2",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -387,11 +360,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "sum_a",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -400,11 +369,7 @@
       "type" : "BIGINT"
     }, {
       "name" : "sum_b",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -413,11 +378,7 @@
       "type" : "INT NOT NULL"
     }, {
       "name" : "avg_b",
-      "aggFunction" : {
-        "name" : "AVG",
-        "kind" : "AVG",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$AVG$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -426,11 +387,8 @@
       "type" : "INT NOT NULL"
     }, {
       "name" : "cnt_d",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 4 ],
       "filterArg" : -1,
       "distinct" : true,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out
index 979378f..54e1aa4 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out
@@ -102,11 +102,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt_a",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -115,11 +112,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "max_b",
-      "aggFunction" : {
-        "name" : "MAX",
-        "kind" : "MAX",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MAX$1",
       "argList" : [ 0 ],
       "filterArg" : 2,
       "distinct" : false,
@@ -128,11 +121,7 @@
       "type" : "INT"
     }, {
       "name" : "min_c",
-      "aggFunction" : {
-        "name" : "MIN",
-        "kind" : "MIN",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MIN$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out
index 155dea2..6d0adeb 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out
@@ -105,11 +105,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt_a",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -118,11 +115,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "max_b",
-      "aggFunction" : {
-        "name" : "MAX",
-        "kind" : "MAX",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MAX$1",
       "argList" : [ 0 ],
       "filterArg" : 2,
       "distinct" : false,
@@ -131,11 +124,7 @@
       "type" : "INT"
     }, {
       "name" : "min_c",
-      "aggFunction" : {
-        "name" : "MIN",
-        "kind" : "MIN",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MIN$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -173,11 +162,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt_a",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -186,11 +172,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "max_b",
-      "aggFunction" : {
-        "name" : "MAX",
-        "kind" : "MAX",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MAX$1",
       "argList" : [ 0 ],
       "filterArg" : 2,
       "distinct" : false,
@@ -199,11 +181,7 @@
       "type" : "INT"
     }, {
       "name" : "min_c",
-      "aggFunction" : {
-        "name" : "MIN",
-        "kind" : "MIN",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MIN$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=false].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=false].out
index 60cc1ac..c8d7b01 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=false].out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=false].out
@@ -107,11 +107,7 @@
     "grouping" : [ ],
     "aggCalls" : [ {
       "name" : "avg_a",
-      "aggFunction" : {
-        "name" : "AVG",
-        "kind" : "AVG",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$AVG$1",
       "argList" : [ 0 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -120,11 +116,8 @@
       "type" : "BIGINT"
     }, {
       "name" : "cnt",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -133,11 +126,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "min_b",
-      "aggFunction" : {
-        "name" : "MIN",
-        "kind" : "MIN",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MIN$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -146,11 +135,7 @@
       "type" : "INT"
     }, {
       "name" : "max_c",
-      "aggFunction" : {
-        "name" : "MAX",
-        "kind" : "MAX",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MAX$1",
       "argList" : [ 2 ],
       "filterArg" : 3,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=true].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=true].out
index 89833ab..c53e463 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=true].out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=true].out
@@ -111,11 +111,7 @@
     "grouping" : [ ],
     "aggCalls" : [ {
       "name" : "avg_a",
-      "aggFunction" : {
-        "name" : "AVG",
-        "kind" : "AVG",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$AVG$1",
       "argList" : [ 0 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -124,11 +120,8 @@
       "type" : "BIGINT"
     }, {
       "name" : "cnt",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -137,11 +130,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "min_b",
-      "aggFunction" : {
-        "name" : "MIN",
-        "kind" : "MIN",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MIN$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -150,11 +139,7 @@
       "type" : "INT"
     }, {
       "name" : "max_c",
-      "aggFunction" : {
-        "name" : "MAX",
-        "kind" : "MAX",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MAX$1",
       "argList" : [ 2 ],
       "filterArg" : 3,
       "distinct" : false,
@@ -191,11 +176,7 @@
     "grouping" : [ ],
     "aggCalls" : [ {
       "name" : "avg_a",
-      "aggFunction" : {
-        "name" : "AVG",
-        "kind" : "AVG",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$AVG$1",
       "argList" : [ 0 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -204,11 +185,8 @@
       "type" : "BIGINT"
     }, {
       "name" : "cnt",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -217,11 +195,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "min_b",
-      "aggFunction" : {
-        "name" : "MIN",
-        "kind" : "MIN",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MIN$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -230,11 +204,7 @@
       "type" : "INT"
     }, {
       "name" : "max_c",
-      "aggFunction" : {
-        "name" : "MAX",
-        "kind" : "MAX",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MAX$1",
       "argList" : [ 2 ],
       "filterArg" : 3,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out
index b278f6a..cc12b89 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out
@@ -91,14 +91,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "a1",
-      "aggFunction" : {
-        "name" : "my_sum1",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAFhvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkVmFyU3VtMUFnZ0Z1bmN0aW9uUncPGXj5ZSICAAB4cgAyb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuQWdncmVnYXRlRnVuY3Rpb24g1IzcoWgbiQIAAHhyADxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5JbXBlcmF0aXZlQWdncmVnYXRlRnVuY3Rpb27yV4D2r81spwIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cA",
-        "bridging" : true
-      },
+      "catalogName" : "`default_catalog`.`default_database`.`my_sum1`",
       "argList" : [ 0, 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -107,14 +100,7 @@
       "type" : "BIGINT"
     }, {
       "name" : "a2",
-      "aggFunction" : {
-        "name" : "my_sum2",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAFhvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkVmFyU3VtMkFnZ0Z1bmN0aW9ucnj2tPeOlPcCAAB4cgAyb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuQWdncmVnYXRlRnVuY3Rpb24g1IzcoWgbiQIAAHhyADxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5JbXBlcmF0aXZlQWdncmVnYXRlRnVuY3Rpb27yV4D2r81spwIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cA",
-        "bridging" : true
-      },
+      "systemName" : "my_sum2",
       "argList" : [ 2, 0 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -123,14 +109,8 @@
       "type" : "BIGINT"
     }, {
       "name" : "a3",
-      "aggFunction" : {
-        "name" : "my_avg",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAFFvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkV2VpZ2h0ZWRBdmctK0s1MInyIQIAAHhyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5BZ2dyZWdhdGVGdW5jdGlvbiDUjNyhaBuJAgAAeHIAPG9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkltcGVyYXRpdmVBZ2dyZWdhdGVGdW5jdGlvbvJXgPavzWynAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLlVzZXJEZWZpbmVkRnVuY3Rpb25ZaAsIu0MPFgIAAHhw",
-        "bridging" : true
-      },
+      "catalogName" : "`default_catalog`.`default_database`.`my_avg`",
+      "class" : "org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvg",
       "argList" : [ 3, 4 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -139,14 +119,7 @@
       "type" : "BIGINT"
     }, {
       "name" : "c1",
-      "aggFunction" : {
-        "name" : "my_count",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAFNvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkQ291bnREaXN0aW5jdIvbEsgDUXHeAgAAeHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uINSM3KFoG4kCAAB4cgA8b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuSW1wZXJhdGl2ZUFnZ3JlZ2F0ZUZ1bmN0aW9u8leA9q_NbKcCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvblloCwi7Qw8WAgAAeHA",
-        "bridging" : true
-      },
+      "systemName" : "my_count",
       "argList" : [ 5 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out
index b03d962..41687d0 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out
@@ -107,14 +107,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "a1",
-      "aggFunction" : {
-        "name" : "my_sum1",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAFhvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkVmFyU3VtMUFnZ0Z1bmN0aW9uUncPGXj5ZSICAAB4cgAyb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuQWdncmVnYXRlRnVuY3Rpb24g1IzcoWgbiQIAAHhyADxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5JbXBlcmF0aXZlQWdncmVnYXRlRnVuY3Rpb27yV4D2r81spwIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cA",
-        "bridging" : true
-      },
+      "catalogName" : "`default_catalog`.`default_database`.`my_sum1`",
       "argList" : [ 0, 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -123,14 +116,7 @@
       "type" : "BIGINT"
     }, {
       "name" : "a2",
-      "aggFunction" : {
-        "name" : "my_sum2",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAFhvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkVmFyU3VtMkFnZ0Z1bmN0aW9ucnj2tPeOlPcCAAB4cgAyb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuQWdncmVnYXRlRnVuY3Rpb24g1IzcoWgbiQIAAHhyADxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5JbXBlcmF0aXZlQWdncmVnYXRlRnVuY3Rpb27yV4D2r81spwIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cA",
-        "bridging" : true
-      },
+      "systemName" : "my_sum2",
       "argList" : [ 2, 0 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -139,14 +125,8 @@
       "type" : "BIGINT"
     }, {
       "name" : "a3",
-      "aggFunction" : {
-        "name" : "my_avg",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAFFvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkV2VpZ2h0ZWRBdmctK0s1MInyIQIAAHhyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5BZ2dyZWdhdGVGdW5jdGlvbiDUjNyhaBuJAgAAeHIAPG9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkltcGVyYXRpdmVBZ2dyZWdhdGVGdW5jdGlvbvJXgPavzWynAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLlVzZXJEZWZpbmVkRnVuY3Rpb25ZaAsIu0MPFgIAAHhw",
-        "bridging" : true
-      },
+      "catalogName" : "`default_catalog`.`default_database`.`my_avg`",
+      "class" : "org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvg",
       "argList" : [ 3, 4 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -155,14 +135,7 @@
       "type" : "BIGINT"
     }, {
       "name" : "c1",
-      "aggFunction" : {
-        "name" : "my_count",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAFNvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkQ291bnREaXN0aW5jdIvbEsgDUXHeAgAAeHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uINSM3KFoG4kCAAB4cgA8b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuSW1wZXJhdGl2ZUFnZ3JlZ2F0ZUZ1bmN0aW9u8leA9q_NbKcCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvblloCwi7Qw8WAgAAeHA",
-        "bridging" : true
-      },
+      "systemName" : "my_count",
       "argList" : [ 5 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
index ff5a5c6..aef0a94 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
@@ -203,11 +203,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -216,11 +213,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out
index ef5f44c..a0a44cb 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out
@@ -203,11 +203,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -216,11 +213,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
index 442e2b9..f41b8d2 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
@@ -203,11 +203,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$3",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -216,11 +213,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$4",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -229,11 +222,8 @@
       "type" : "INT"
     }, {
       "name" : "EXPR$5",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -242,14 +232,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$6",
-      "aggFunction" : {
-        "name" : "concat_distinct_agg",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAF9vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkQ29uY2F0RGlzdGluY3RBZ2dGdW5jdGlvbtrVmfNk5uTFAgAAeHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uINSM3KFoG4kCAAB4cgA8b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuSW1wZXJhdGl2ZUFnZ3JlZ2F0ZUZ1bmN0aW9u8leA9q_NbKcCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvblloCwi7Qw8WAgAAeHA",
-        "bridging" : true
-      },
+      "catalogName" : "`default_catalog`.`default_database`.`concat_distinct_agg`",
+      "class" : "org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$ConcatDistinctAggFunction",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
index 825f324..1c9fbb7 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
@@ -290,11 +290,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out
index 5f3bb87..b44c9c4 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out
@@ -290,11 +290,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
index f96a1c5..d9c1ed0 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
@@ -269,11 +269,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
index e2d4464..76cc64f 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
@@ -105,11 +105,8 @@
     "grouping" : [ 0, 2 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -216,11 +213,8 @@
     "finalAggGrouping" : [ 0 ],
     "partialOriginalAggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -259,11 +253,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "$SUM0",
-        "kind" : "SUM0",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$$SUM0$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
index b5a5b58..abb17ca 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
@@ -65,11 +65,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "b",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -78,11 +75,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "b1",
-      "aggFunction" : {
-        "name" : "MAX",
-        "kind" : "MAX",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MAX$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -120,11 +113,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "b",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -133,11 +123,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "b1",
-      "aggFunction" : {
-        "name" : "MAX",
-        "kind" : "MAX",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MAX$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -204,11 +190,7 @@
     "grouping" : [ 0, 2 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -217,11 +199,8 @@
       "type" : "INT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -230,11 +209,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -343,11 +319,7 @@
     "finalAggGrouping" : [ 0 ],
     "partialOriginalAggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -356,11 +328,8 @@
       "type" : "INT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -369,11 +338,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -412,11 +378,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -425,11 +387,7 @@
       "type" : "INT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "$SUM0",
-        "kind" : "SUM0",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$$SUM0$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -438,11 +396,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "$SUM0",
-        "kind" : "SUM0",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$$SUM0$1",
       "argList" : [ 4 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out
index 5326ed8..3bd9221 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out
@@ -59,11 +59,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "a2",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -177,11 +173,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "b2",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out
index 92f7c91..2541c41 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out
@@ -285,11 +285,8 @@
         },
         "aggCalls" : [ {
           "name" : "w0$o0",
-          "aggFunction" : {
-            "name" : "COUNT",
-            "kind" : "COUNT",
-            "syntax" : "FUNCTION_STAR"
-          },
+          "syntax" : "FUNCTION_STAR",
+          "internalName" : "$COUNT$1",
           "argList" : [ 0 ],
           "filterArg" : -1,
           "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out
index 84130e2..461837d 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out
@@ -296,11 +296,8 @@
         },
         "aggCalls" : [ {
           "name" : "w0$o0",
-          "aggFunction" : {
-            "name" : "COUNT",
-            "kind" : "COUNT",
-            "syntax" : "FUNCTION_STAR"
-          },
+          "syntax" : "FUNCTION_STAR",
+          "internalName" : "$COUNT$1",
           "argList" : [ 1 ],
           "filterArg" : -1,
           "distinct" : false,
@@ -309,11 +306,7 @@
           "type" : "BIGINT NOT NULL"
         }, {
           "name" : "w0$o1",
-          "aggFunction" : {
-            "name" : "$SUM0",
-            "kind" : "SUM0",
-            "syntax" : "FUNCTION"
-          },
+          "internalName" : "$$SUM0$1",
           "argList" : [ 1 ],
           "filterArg" : -1,
           "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out
index 2c7b5d6..fba1d48 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out
@@ -233,11 +233,8 @@
         },
         "aggCalls" : [ {
           "name" : "w0$o0",
-          "aggFunction" : {
-            "name" : "COUNT",
-            "kind" : "COUNT",
-            "syntax" : "FUNCTION_STAR"
-          },
+          "syntax" : "FUNCTION_STAR",
+          "internalName" : "$COUNT$1",
           "argList" : [ 1 ],
           "filterArg" : -1,
           "distinct" : false,
@@ -246,11 +243,7 @@
           "type" : "BIGINT NOT NULL"
         }, {
           "name" : "w0$o1",
-          "aggFunction" : {
-            "name" : "$SUM0",
-            "kind" : "SUM0",
-            "syntax" : "FUNCTION"
-          },
+          "internalName" : "$$SUM0$1",
           "argList" : [ 1 ],
           "filterArg" : -1,
           "distinct" : false,
@@ -259,11 +252,7 @@
           "type" : "BIGINT NOT NULL"
         }, {
           "name" : "w0$o2",
-          "aggFunction" : {
-            "name" : "MIN",
-            "kind" : "MIN",
-            "syntax" : "FUNCTION"
-          },
+          "internalName" : "$MIN$1",
           "argList" : [ 1 ],
           "filterArg" : -1,
           "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out
index 6ae8418..6f7c396 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out
@@ -274,11 +274,8 @@
         },
         "aggCalls" : [ {
           "name" : "w0$o0",
-          "aggFunction" : {
-            "name" : "COUNT",
-            "kind" : "COUNT",
-            "syntax" : "FUNCTION_STAR"
-          },
+          "syntax" : "FUNCTION_STAR",
+          "internalName" : "$COUNT$1",
           "argList" : [ 0 ],
           "filterArg" : -1,
           "distinct" : false,
@@ -287,11 +284,7 @@
           "type" : "BIGINT NOT NULL"
         }, {
           "name" : "w0$o1",
-          "aggFunction" : {
-            "name" : "$SUM0",
-            "kind" : "SUM0",
-            "syntax" : "FUNCTION"
-          },
+          "internalName" : "$$SUM0$1",
           "argList" : [ 0 ],
           "filterArg" : -1,
           "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProctimeBoundedDistinctPartitionedRowOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProctimeBoundedDistinctPartitionedRowOver.out
index c457ed4..6c8d61c 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProctimeBoundedDistinctPartitionedRowOver.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProctimeBoundedDistinctPartitionedRowOver.out
@@ -280,11 +280,8 @@
         },
         "aggCalls" : [ {
           "name" : "w0$o0",
-          "aggFunction" : {
-            "name" : "COUNT",
-            "kind" : "COUNT",
-            "syntax" : "FUNCTION_STAR"
-          },
+          "syntax" : "FUNCTION_STAR",
+          "internalName" : "$COUNT$1",
           "argList" : [ 0 ],
           "filterArg" : -1,
           "distinct" : true,
@@ -293,11 +290,7 @@
           "type" : "BIGINT NOT NULL"
         }, {
           "name" : "w0$o1",
-          "aggFunction" : {
-            "name" : "$SUM0",
-            "kind" : "SUM0",
-            "syntax" : "FUNCTION"
-          },
+          "internalName" : "$$SUM0$1",
           "argList" : [ 0 ],
           "filterArg" : -1,
           "distinct" : true,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProctimeBoundedDistinctWithNonDistinctPartitionedRowOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProctimeBoundedDistinctWithNonDistinctPartitionedRowOver.out
index 2267835..3e8b12d 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProctimeBoundedDistinctWithNonDistinctPartitionedRowOver.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProctimeBoundedDistinctWithNonDistinctPartitionedRowOver.out
@@ -291,11 +291,8 @@
         },
         "aggCalls" : [ {
           "name" : "w0$o0",
-          "aggFunction" : {
-            "name" : "COUNT",
-            "kind" : "COUNT",
-            "syntax" : "FUNCTION_STAR"
-          },
+          "syntax" : "FUNCTION_STAR",
+          "internalName" : "$COUNT$1",
           "argList" : [ 0 ],
           "filterArg" : -1,
           "distinct" : false,
@@ -304,11 +301,7 @@
           "type" : "BIGINT NOT NULL"
         }, {
           "name" : "w0$o1",
-          "aggFunction" : {
-            "name" : "$SUM0",
-            "kind" : "SUM0",
-            "syntax" : "FUNCTION"
-          },
+          "internalName" : "$$SUM0$1",
           "argList" : [ 0 ],
           "filterArg" : -1,
           "distinct" : false,
@@ -317,11 +310,8 @@
           "type" : "INT NOT NULL"
         }, {
           "name" : "w0$o2",
-          "aggFunction" : {
-            "name" : "COUNT",
-            "kind" : "COUNT",
-            "syntax" : "FUNCTION_STAR"
-          },
+          "syntax" : "FUNCTION_STAR",
+          "internalName" : "$COUNT$1",
           "argList" : [ 0 ],
           "filterArg" : -1,
           "distinct" : true,
@@ -330,11 +320,8 @@
           "type" : "BIGINT NOT NULL"
         }, {
           "name" : "w0$o3",
-          "aggFunction" : {
-            "name" : "COUNT",
-            "kind" : "COUNT",
-            "syntax" : "FUNCTION_STAR"
-          },
+          "syntax" : "FUNCTION_STAR",
+          "internalName" : "$COUNT$1",
           "argList" : [ 2 ],
           "filterArg" : -1,
           "distinct" : true,
@@ -343,11 +330,7 @@
           "type" : "BIGINT NOT NULL"
         }, {
           "name" : "w0$o4",
-          "aggFunction" : {
-            "name" : "$SUM0",
-            "kind" : "SUM0",
-            "syntax" : "FUNCTION"
-          },
+          "internalName" : "$$SUM0$1",
           "argList" : [ 2 ],
           "filterArg" : -1,
           "distinct" : true,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out
index 4d9d22c..cd7d26f 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out
@@ -167,11 +167,8 @@
         },
         "aggCalls" : [ {
           "name" : "w0$o0",
-          "aggFunction" : {
-            "name" : "COUNT",
-            "kind" : "COUNT",
-            "syntax" : "FUNCTION_STAR"
-          },
+          "syntax" : "FUNCTION_STAR",
+          "internalName" : "$COUNT$1",
           "argList" : [ 0 ],
           "filterArg" : -1,
           "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out
index ec56389..be85b4c 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out
@@ -245,11 +245,8 @@
     "grouping" : [ 0, 3 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -258,11 +255,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -271,11 +264,8 @@
       "type" : "BIGINT"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -412,11 +402,8 @@
     "grouping" : [ 0, 1 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -425,11 +412,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -438,11 +421,8 @@
       "type" : "BIGINT"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -572,11 +552,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "$SUM0",
-        "kind" : "SUM0",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$$SUM0$1",
       "argList" : [ 4 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -585,11 +561,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 5 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -598,11 +570,7 @@
       "type" : "BIGINT"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "$SUM0",
-        "kind" : "SUM0",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$$SUM0$1",
       "argList" : [ 6 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -654,11 +622,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "$SUM0",
-        "kind" : "SUM0",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$$SUM0$1",
       "argList" : [ 4 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -667,11 +631,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 5 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -680,11 +640,7 @@
       "type" : "BIGINT"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "$SUM0",
-        "kind" : "SUM0",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$$SUM0$1",
       "argList" : [ 6 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeCumulateWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeCumulateWindow.out
index 6f1dc2f..4d3c3dc 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeCumulateWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeCumulateWindow.out
@@ -224,11 +224,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -237,11 +234,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$3",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -292,11 +285,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -305,11 +295,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$3",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeCumulateWindowWithOffset.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeCumulateWindowWithOffset.out
index 9eefa48..3915c80 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeCumulateWindowWithOffset.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeCumulateWindowWithOffset.out
@@ -224,11 +224,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -237,11 +234,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$3",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -293,11 +286,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -306,11 +296,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$3",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
index 26f75c1..4c8d84a 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
@@ -224,11 +224,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -237,11 +234,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -292,11 +285,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -305,11 +295,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindowWithOffset.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindowWithOffset.out
index 1051652..25d8c8f 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindowWithOffset.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindowWithOffset.out
@@ -224,11 +224,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -237,11 +234,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -293,11 +286,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -306,11 +296,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
index d5d30f2..6e258b3 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
@@ -224,11 +224,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$3",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -237,11 +234,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$4",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -250,11 +243,8 @@
       "type" : "INT"
     }, {
       "name" : "EXPR$5",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -263,14 +253,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$6",
-      "aggFunction" : {
-        "name" : "concat_distinct_agg",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAF9vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkQ29uY2F0RGlzdGluY3RBZ2dGdW5jdGlvbtrVmfNk5uTFAgAAeHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uINSM3KFoG4kCAAB4cgA8b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuSW1wZXJhdGl2ZUFnZ3JlZ2F0ZUZ1bmN0aW9u8leA9q_NbKcCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvblloCwi7Qw8WAgAAeHA",
-        "bridging" : true
-      },
+      "catalogName" : "`default_catalog`.`default_database`.`concat_distinct_agg`",
+      "class" : "org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$ConcatDistinctAggFunction",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -482,11 +466,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$3",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -495,11 +476,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$4",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -508,11 +485,8 @@
       "type" : "INT"
     }, {
       "name" : "EXPR$5",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -521,14 +495,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$6",
-      "aggFunction" : {
-        "name" : "concat_distinct_agg",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAF9vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkQ29uY2F0RGlzdGluY3RBZ2dGdW5jdGlvbtrVmfNk5uTFAgAAeHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uINSM3KFoG4kCAAB4cgA8b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuSW1wZXJhdGl2ZUFnZ3JlZ2F0ZUZ1bmN0aW9u8leA9q_NbKcCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvblloCwi7Qw8WAgAAeHA",
-        "bridging" : true
-      },
+      "catalogName" : "`default_catalog`.`default_database`.`concat_distinct_agg`",
+      "class" : "org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$ConcatDistinctAggFunction",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindowWithOffset.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindowWithOffset.out
index 5910460..ea0bd37 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindowWithOffset.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindowWithOffset.out
@@ -224,11 +224,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$3",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -237,11 +234,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$4",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -250,11 +243,8 @@
       "type" : "INT"
     }, {
       "name" : "EXPR$5",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -263,14 +253,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$6",
-      "aggFunction" : {
-        "name" : "concat_distinct_agg",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAF9vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkQ29uY2F0RGlzdGluY3RBZ2dGdW5jdGlvbtrVmfNk5uTFAgAAeHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uINSM3KFoG4kCAAB4cgA8b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuSW1wZXJhdGl2ZUFnZ3JlZ2F0ZUZ1bmN0aW9u8leA9q_NbKcCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvblloCwi7Qw8WAgAAeHA",
-        "bridging" : true
-      },
+      "catalogName" : "`default_catalog`.`default_database`.`concat_distinct_agg`",
+      "class" : "org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$ConcatDistinctAggFunction",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -483,11 +467,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$3",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -496,11 +477,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$4",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -509,11 +486,8 @@
       "type" : "INT"
     }, {
       "name" : "EXPR$5",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -522,14 +496,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$6",
-      "aggFunction" : {
-        "name" : "concat_distinct_agg",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAF9vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkQ29uY2F0RGlzdGluY3RBZ2dGdW5jdGlvbtrVmfNk5uTFAgAAeHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uINSM3KFoG4kCAAB4cgA8b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuSW1wZXJhdGl2ZUFnZ3JlZ2F0ZUZ1bmN0aW9u8leA9q_NbKcCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvblloCwi7Qw8WAgAAeHA",
-        "bridging" : true
-      },
+      "catalogName" : "`default_catalog`.`default_database`.`concat_distinct_agg`",
+      "class" : "org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$ConcatDistinctAggFunction",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeCumulateWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeCumulateWindow.out
index aae8b5f..0217355 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeCumulateWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeCumulateWindow.out
@@ -289,11 +289,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
index 4895488..2973abc 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
@@ -290,11 +290,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
index 983c2a6..02faef6 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
@@ -269,11 +269,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowJoinJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowJoinJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
index dddee0c..617e8b8 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowJoinJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowJoinJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
@@ -173,11 +173,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -186,11 +183,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "uv",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -314,11 +308,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -327,11 +318,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "uv",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -665,11 +653,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -678,11 +663,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "uv",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -806,11 +788,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -819,11 +798,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "uv",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,