You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/02/25 08:51:17 UTC

[flink] branch master updated (8d86133 -> 3d87483)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 8d86133  [FLINK-26121][runtime] Adds clearUnhandledEvents() method
     new 5c3c90d  [FLINK-26283][table-planner] Harden AggregateCall serialization in JSON plan
     new 3d87483  [hotfix][table-planner] Clean up serde classes

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../plan/abilities/sink/WritingMetadataSpec.java   |   6 -
 .../abilities/source/SourceAbilitySpecBase.java    |   6 -
 .../planner/plan/logical/WindowingStrategy.java    |   6 -
 .../table/planner/plan/nodes/exec/ExecNode.java    |   6 -
 .../planner/plan/nodes/exec/InputProperty.java     |   6 -
 .../plan/nodes/exec/common/CommonExecValues.java   |  10 +-
 .../exec/serde/AggregateCallJsonDeserializer.java  | 153 ++++-----------------
 .../exec/serde/AggregateCallJsonSerializer.java    |  89 ++++--------
 .../exec/serde/ChangelogModeJsonDeserializer.java  |  14 +-
 .../exec/serde/ChangelogModeJsonSerializer.java    |  11 +-
 .../nodes/exec/serde/ColumnJsonDeserializer.java   |  11 +-
 .../nodes/exec/serde/ColumnJsonSerializer.java     |  31 +++--
 .../ContextResolvedTableJsonDeserializer.java      |  11 +-
 .../serde/ContextResolvedTableJsonSerializer.java  |  16 ++-
 .../nodes/exec/serde/DataTypeJsonDeserializer.java |   4 +-
 .../nodes/exec/serde/DataTypeJsonSerializer.java   |  20 +--
 .../exec/serde/ExecNodeGraphJsonDeserializer.java  |  13 +-
 .../exec/serde/ExecNodeGraphJsonSerializer.java    |  11 +-
 .../exec/serde/FlinkVersionJsonDeserializer.java   |  12 +-
 .../exec/serde/FlinkVersionJsonSerializer.java     |  12 +-
 .../plan/nodes/exec/serde/JsonPlanEdge.java        |  24 ++--
 .../plan/nodes/exec/serde/JsonPlanGraph.java       |  16 ++-
 .../plan/nodes/exec/serde/JsonSerdeUtil.java       |  24 ++--
 .../exec/serde/LogicalTypeJsonDeserializer.java    |   4 +-
 .../exec/serde/LogicalTypeJsonSerializer.java      |  60 ++++----
 .../exec/serde/LogicalWindowJsonDeserializer.java  |  11 +-
 .../exec/serde/LogicalWindowJsonSerializer.java    |  39 +++---
 .../serde/ObjectIdentifierJsonDeserializer.java    |  12 +-
 .../exec/serde/ObjectIdentifierJsonSerializer.java |  12 +-
 .../exec/serde/RelDataTypeJsonDeserializer.java    |   4 +-
 .../exec/serde/RelDataTypeJsonSerializer.java      |   4 +-
 .../RequiredDistributionJsonDeserializer.java      |  15 +-
 .../serde/RequiredDistributionJsonSerializer.java  |  12 +-
 .../ResolvedCatalogTableJsonDeserializer.java      |  11 +-
 .../serde/ResolvedCatalogTableJsonSerializer.java  |  19 ++-
 .../serde/ResolvedExpressionJsonDeserializer.java  |  11 +-
 .../serde/ResolvedExpressionJsonSerializer.java    |  19 ++-
 .../exec/serde/ResolvedSchemaJsonDeserializer.java |  11 +-
 .../exec/serde/ResolvedSchemaJsonSerializer.java   |  17 ++-
 .../nodes/exec/serde/RexNodeJsonDeserializer.java  |  10 +-
 .../nodes/exec/serde/RexNodeJsonSerializer.java    |  65 ++++-----
 .../exec/serde/RexWindowBoundJsonDeserializer.java |  11 +-
 .../exec/serde/RexWindowBoundJsonSerializer.java   |  28 ++--
 .../plan/nodes/exec/serde/SerdeContext.java        |   2 +
 .../nodes/exec/serde/ShuffleJsonDeserializer.java  |  15 +-
 .../nodes/exec/serde/ShuffleJsonSerializer.java    |  12 +-
 .../nodes/exec/serde/UniqueConstraintMixin.java    |  10 +-
 .../plan/nodes/exec/serde/WatermarkSpecMixin.java  |   6 +-
 .../serde/WindowReferenceJsonDeserializer.java     |   4 +-
 .../exec/serde/WindowReferenceJsonSerializer.java  |   8 +-
 .../planner/plan/nodes/exec/spec/OverSpec.java     |   9 --
 .../stream/StreamExecGlobalGroupAggregate.java     |   6 -
 .../stream/StreamExecGlobalWindowAggregate.java    |   6 -
 .../stream/StreamExecGroupWindowAggregate.java     |   6 -
 .../StreamExecIncrementalGroupAggregate.java       |   6 -
 .../plan/nodes/exec/stream/StreamExecSink.java     |   6 -
 .../table/planner/plan/utils/LookupJoinUtil.java   |   6 -
 .../ExpandJsonPlanTest_jsonplan/testExpand.out     |  25 +---
 ...tDistinctAggCalls[isMiniBatchEnabled=false].out |  39 ++----
 ...stDistinctAggCalls[isMiniBatchEnabled=true].out |  78 +++--------
 ...gCallsWithGroupBy[isMiniBatchEnabled=false].out |  19 +--
 ...ggCallsWithGroupBy[isMiniBatchEnabled=true].out |  38 ++---
 ...AggWithoutGroupBy[isMiniBatchEnabled=false].out |  25 +---
 ...eAggWithoutGroupBy[isMiniBatchEnabled=true].out |  50 ++-----
 ...erDefinedAggCalls[isMiniBatchEnabled=false].out |  37 +----
 ...serDefinedAggCalls[isMiniBatchEnabled=true].out |  37 +----
 .../testEventTimeHopWindow.out                     |  13 +-
 .../testEventTimeSessionWindow.out                 |  13 +-
 .../testEventTimeTumbleWindow.out                  |  30 +---
 .../testProcTimeHopWindow.out                      |   6 +-
 .../testProcTimeSessionWindow.out                  |   6 +-
 .../testProcTimeTumbleWindow.out                   |   7 +-
 .../testIncrementalAggregate.out                   |  20 +--
 ...lAggregateWithSumCountDistinctAndRetraction.out |  84 +++--------
 .../testInnerJoinWithPk.out                        |  12 +-
 .../testProcTimeBoundedNonPartitionedRangeOver.out |   7 +-
 .../testProcTimeBoundedPartitionedRangeOver.out    |  13 +-
 ...undedPartitionedRowsOverWithBuiltinProctime.out |  19 +--
 .../testProcTimeUnboundedPartitionedRangeOver.out  |  13 +-
 ...stProctimeBoundedDistinctPartitionedRowOver.out |  13 +-
 ...edDistinctWithNonDistinctPartitionedRowOver.out |  33 ++---
 .../testRowTimeBoundedPartitionedRowsOver.out      |   7 +-
 .../testDistinctSplitEnabled.out                   |  76 +++-------
 .../testEventTimeCumulateWindow.out                |  26 +---
 .../testEventTimeCumulateWindowWithOffset.out      |  26 +---
 .../testEventTimeHopWindow.out                     |  26 +---
 .../testEventTimeHopWindowWithOffset.out           |  26 +---
 .../testEventTimeTumbleWindow.out                  |  60 ++------
 .../testEventTimeTumbleWindowWithOffset.out        |  60 ++------
 .../testProcTimeCumulateWindow.out                 |   7 +-
 .../testProcTimeHopWindow.out                      |   6 +-
 .../testProcTimeTumbleWindow.out                   |   7 +-
 .../testEventTimeTumbleWindow.out                  |  56 +++-----
 93 files changed, 682 insertions(+), 1327 deletions(-)

[flink] 01/02: [FLINK-26283][table-planner] Harden AggregateCall serialization in JSON plan

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5c3c90d29024370438d41255aa2650758d9c757f
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Feb 21 15:51:37 2022 +0100

    [FLINK-26283][table-planner] Harden AggregateCall serialization in JSON plan
    
    This closes #18872.
---
 .../exec/serde/AggregateCallJsonDeserializer.java  | 143 +++------------------
 .../exec/serde/AggregateCallJsonSerializer.java    |  73 +++--------
 .../nodes/exec/serde/RexNodeJsonDeserializer.java  |   6 +-
 .../nodes/exec/serde/RexNodeJsonSerializer.java    |   5 +-
 .../ExpandJsonPlanTest_jsonplan/testExpand.out     |  25 +---
 ...tDistinctAggCalls[isMiniBatchEnabled=false].out |  39 ++----
 ...stDistinctAggCalls[isMiniBatchEnabled=true].out |  78 +++--------
 ...gCallsWithGroupBy[isMiniBatchEnabled=false].out |  19 +--
 ...ggCallsWithGroupBy[isMiniBatchEnabled=true].out |  38 ++----
 ...AggWithoutGroupBy[isMiniBatchEnabled=false].out |  25 +---
 ...eAggWithoutGroupBy[isMiniBatchEnabled=true].out |  50 ++-----
 ...erDefinedAggCalls[isMiniBatchEnabled=false].out |  37 +-----
 ...serDefinedAggCalls[isMiniBatchEnabled=true].out |  37 +-----
 .../testEventTimeHopWindow.out                     |  13 +-
 .../testEventTimeSessionWindow.out                 |  13 +-
 .../testEventTimeTumbleWindow.out                  |  30 +----
 .../testProcTimeHopWindow.out                      |   6 +-
 .../testProcTimeSessionWindow.out                  |   6 +-
 .../testProcTimeTumbleWindow.out                   |   7 +-
 .../testIncrementalAggregate.out                   |  20 +--
 ...lAggregateWithSumCountDistinctAndRetraction.out |  84 +++---------
 .../testInnerJoinWithPk.out                        |  12 +-
 .../testProcTimeBoundedNonPartitionedRangeOver.out |   7 +-
 .../testProcTimeBoundedPartitionedRangeOver.out    |  13 +-
 ...undedPartitionedRowsOverWithBuiltinProctime.out |  19 +--
 .../testProcTimeUnboundedPartitionedRangeOver.out  |  13 +-
 ...stProctimeBoundedDistinctPartitionedRowOver.out |  13 +-
 ...edDistinctWithNonDistinctPartitionedRowOver.out |  33 ++---
 .../testRowTimeBoundedPartitionedRowsOver.out      |   7 +-
 .../testDistinctSplitEnabled.out                   |  76 +++--------
 .../testEventTimeCumulateWindow.out                |  26 +---
 .../testEventTimeCumulateWindowWithOffset.out      |  26 +---
 .../testEventTimeHopWindow.out                     |  26 +---
 .../testEventTimeHopWindowWithOffset.out           |  26 +---
 .../testEventTimeTumbleWindow.out                  |  60 ++-------
 .../testEventTimeTumbleWindowWithOffset.out        |  60 ++-------
 .../testProcTimeCumulateWindow.out                 |   7 +-
 .../testProcTimeHopWindow.out                      |   6 +-
 .../testProcTimeTumbleWindow.out                   |   7 +-
 .../testEventTimeTumbleWindow.out                  |  56 +++-----
 40 files changed, 268 insertions(+), 979 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java
index 69c1cce..28eef53 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java
@@ -18,24 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.catalog.ContextResolvedFunction;
-import org.apache.flink.table.catalog.DataTypeFactory;
-import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.functions.FunctionIdentifier;
-import org.apache.flink.table.functions.FunctionKind;
-import org.apache.flink.table.functions.ImperativeAggregateFunction;
-import org.apache.flink.table.functions.UserDefinedFunctionHelper;
-import org.apache.flink.table.module.CoreModule;
-import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction;
-import org.apache.flink.table.planner.functions.utils.AggSqlFunction;
-import org.apache.flink.table.types.inference.TypeInference;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.table.utils.EncodingUtils;
-import org.apache.flink.util.Preconditions;
-
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
@@ -44,33 +27,17 @@ import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.SqlSyntax;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.validate.SqlNameMatchers;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_AGG_FUNCTION;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_APPROXIMATE;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_ARG_LIST;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_BRIDGING;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_BUILT_IN;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_DISPLAY_NAME;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_DISTINCT;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_FILTER_ARG;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_FUNCTION_KIND;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_IGNORE_NULLS;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_INSTANCE;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_KIND;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_NAME;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_REQUIRES_OVER;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_SYNTAX;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_TYPE;
 
 /**
@@ -86,24 +53,26 @@ public class AggregateCallJsonDeserializer extends StdDeserializer<AggregateCall
 
     @Override
     public AggregateCall deserialize(JsonParser jsonParser, DeserializationContext ctx)
-            throws IOException, JsonProcessingException {
-        JsonNode jsonNode = jsonParser.readValueAsTree();
-        JsonNode aggFunNode = jsonNode.get(FIELD_NAME_AGG_FUNCTION);
-        SqlAggFunction aggFunction = toSqlAggFunction(aggFunNode, SerdeContext.get(ctx));
+            throws IOException {
+        final JsonNode jsonNode = jsonParser.readValueAsTree();
+        final SerdeContext serdeContext = SerdeContext.get(ctx);
 
-        List<Integer> argList = new ArrayList<>();
-        JsonNode argListNode = jsonNode.get(FIELD_NAME_ARG_LIST);
+        final String name = jsonNode.required(FIELD_NAME_NAME).asText();
+        final SqlAggFunction aggFunction =
+                (SqlAggFunction)
+                        RexNodeJsonDeserializer.deserializeSqlOperator(jsonNode, serdeContext);
+        final List<Integer> argList = new ArrayList<>();
+        final JsonNode argListNode = jsonNode.required(FIELD_NAME_ARG_LIST);
         for (JsonNode argNode : argListNode) {
             argList.add(argNode.intValue());
         }
-        int filterArg = jsonNode.get(FIELD_NAME_FILTER_ARG).intValue();
-        boolean distinct = jsonNode.get(FIELD_NAME_DISTINCT).asBoolean();
-        boolean approximate = jsonNode.get(FIELD_NAME_APPROXIMATE).asBoolean();
-        boolean ignoreNulls = jsonNode.get(FIELD_NAME_IGNORE_NULLS).asBoolean();
-        JsonNode typeNode = jsonNode.get(FIELD_NAME_TYPE);
-        RelDataType relDataType =
-                ctx.readValue(typeNode.traverse(jsonParser.getCodec()), RelDataType.class);
-        String name = jsonNode.get(FIELD_NAME_NAME).asText();
+        final int filterArg = jsonNode.required(FIELD_NAME_FILTER_ARG).asInt();
+        final boolean distinct = jsonNode.required(FIELD_NAME_DISTINCT).asBoolean();
+        final boolean approximate = jsonNode.required(FIELD_NAME_APPROXIMATE).asBoolean();
+        final boolean ignoreNulls = jsonNode.required(FIELD_NAME_IGNORE_NULLS).asBoolean();
+        final RelDataType relDataType =
+                RelDataTypeJsonDeserializer.deserialize(
+                        jsonNode.required(FIELD_NAME_TYPE), serdeContext);
 
         return AggregateCall.create(
                 aggFunction,
@@ -116,84 +85,4 @@ public class AggregateCallJsonDeserializer extends StdDeserializer<AggregateCall
                 relDataType,
                 name);
     }
-
-    private SqlAggFunction toSqlAggFunction(JsonNode jsonNode, SerdeContext ctx)
-            throws IOException {
-        String name = jsonNode.get(FIELD_NAME_NAME).asText();
-        SqlKind sqlKind = SqlKind.valueOf(jsonNode.get(FIELD_NAME_KIND).asText());
-        SqlSyntax sqlSyntax = SqlSyntax.valueOf(jsonNode.get(FIELD_NAME_SYNTAX).asText());
-        List<SqlOperator> operators = new ArrayList<>();
-        ctx.getOperatorTable()
-                .lookupOperatorOverloads(
-                        new SqlIdentifier(name, new SqlParserPos(0, 0)),
-                        null, // category
-                        sqlSyntax,
-                        operators,
-                        SqlNameMatchers.liberal());
-        for (SqlOperator operator : operators) {
-            // in case different operator has the same kind, check with both name and kind.
-            if (operator.kind == sqlKind) {
-                return (SqlAggFunction) operator;
-            }
-        }
-
-        DataTypeFactory dataTypeFactory =
-                ctx.getFlinkContext().getCatalogManager().getDataTypeFactory();
-
-        // built-in function
-        // TODO supports other module's built-in function
-        if (jsonNode.has(FIELD_NAME_BUILT_IN) && jsonNode.get(FIELD_NAME_BUILT_IN).booleanValue()) {
-            Optional<FunctionDefinition> definition =
-                    CoreModule.INSTANCE.getFunctionDefinition(name);
-            Preconditions.checkArgument(definition.isPresent());
-            TypeInference typeInference = definition.get().getTypeInference(dataTypeFactory);
-            return BridgingSqlAggFunction.of(
-                    dataTypeFactory,
-                    ctx.getTypeFactory(),
-                    sqlKind,
-                    ContextResolvedFunction.permanent(
-                            FunctionIdentifier.of(name), definition.get()),
-                    typeInference);
-        }
-
-        if (jsonNode.has(FIELD_NAME_FUNCTION_KIND) && jsonNode.has(FIELD_NAME_INSTANCE)) {
-            FunctionKind functionKind =
-                    FunctionKind.valueOf(
-                            jsonNode.get(FIELD_NAME_FUNCTION_KIND).asText().toUpperCase());
-            String instanceStr = jsonNode.get(FIELD_NAME_INSTANCE).asText();
-            if (functionKind != FunctionKind.AGGREGATE) {
-                throw new TableException("Unknown function kind: " + functionKind);
-            }
-            if (jsonNode.has(FIELD_NAME_BRIDGING)
-                    && jsonNode.get(FIELD_NAME_BRIDGING).booleanValue()) {
-                FunctionDefinition definition =
-                        EncodingUtils.decodeStringToObject(instanceStr, ctx.getClassLoader());
-                TypeInference typeInference = definition.getTypeInference(dataTypeFactory);
-                return BridgingSqlAggFunction.of(
-                        dataTypeFactory,
-                        ctx.getTypeFactory(),
-                        sqlKind,
-                        ContextResolvedFunction.permanent(FunctionIdentifier.of(name), definition),
-                        typeInference);
-            } else {
-                String displayName = jsonNode.get(FIELD_NAME_DISPLAY_NAME).asText();
-                boolean requiresOver = jsonNode.get(FIELD_NAME_REQUIRES_OVER).booleanValue();
-                ImperativeAggregateFunction<?, ?> function =
-                        EncodingUtils.decodeStringToObject(instanceStr, ctx.getClassLoader());
-                return AggSqlFunction.apply(
-                        FunctionIdentifier.of(name),
-                        displayName,
-                        function,
-                        TypeConversions.fromLegacyInfoToDataType(
-                                UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(
-                                        function)),
-                        TypeConversions.fromLegacyInfoToDataType(
-                                UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(
-                                        function)),
-                        ctx.getTypeFactory(),
-                        requiresOver);
-            }
-        }
-        throw new TableException("Unknown operator: " + jsonNode.toPrettyString());
-    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
index 27f2549..35e3ac9 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
@@ -18,47 +18,35 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
-import org.apache.flink.table.functions.BuiltInFunctionDefinition;
-import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.functions.FunctionKind;
-import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction;
-import org.apache.flink.table.planner.functions.utils.AggSqlFunction;
-import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
 
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.sql.SqlAggFunction;
 
 import java.io.IOException;
 
 /**
- * JSON serializer for {@link AggregateCall}. refer to {@link AggregateCallJsonDeserializer} for
- * deserializer.
+ * JSON serializer for {@link AggregateCall}.
+ *
+ * @see AggregateCallJsonDeserializer for the reverse operation
  */
+@Internal
 public class AggregateCallJsonSerializer extends StdSerializer<AggregateCall> {
     private static final long serialVersionUID = 1L;
 
-    public static final String FIELD_NAME_KIND = "kind";
-    public static final String FIELD_NAME_TYPE = "type";
     public static final String FIELD_NAME_NAME = "name";
-
-    public static final String FIELD_NAME_AGG_FUNCTION = "aggFunction";
-    public static final String FIELD_NAME_INSTANCE = "instance";
-    public static final String FIELD_NAME_SYNTAX = "syntax";
-    public static final String FIELD_NAME_DISPLAY_NAME = "displayName";
-    public static final String FIELD_NAME_FUNCTION_KIND = "functionKind";
-    public static final String FIELD_NAME_BRIDGING = "bridging";
-    public static final String FIELD_NAME_BUILT_IN = "builtIn";
-    public static final String FIELD_NAME_REQUIRES_OVER = "requiresOver";
-
     public static final String FIELD_NAME_ARG_LIST = "argList";
     public static final String FIELD_NAME_FILTER_ARG = "filterArg";
     public static final String FIELD_NAME_DISTINCT = "distinct";
     public static final String FIELD_NAME_APPROXIMATE = "approximate";
     public static final String FIELD_NAME_IGNORE_NULLS = "ignoreNulls";
+    public static final String FIELD_NAME_TYPE = "type";
 
     public AggregateCallJsonSerializer() {
         super(AggregateCall.class);
@@ -70,9 +58,17 @@ public class AggregateCallJsonSerializer extends StdSerializer<AggregateCall> {
             JsonGenerator jsonGenerator,
             SerializerProvider serializerProvider)
             throws IOException {
+        final ReadableConfig config = SerdeContext.get(serializerProvider).getConfiguration();
+        final CatalogPlanCompilation compilationStrategy =
+                config.get(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS);
+
         jsonGenerator.writeStartObject();
         jsonGenerator.writeStringField(FIELD_NAME_NAME, aggCall.getName());
-        serialize(aggCall.getAggregation(), jsonGenerator);
+        RexNodeJsonSerializer.serializeSqlOperator(
+                aggCall.getAggregation(),
+                jsonGenerator,
+                serializerProvider,
+                compilationStrategy == CatalogPlanCompilation.ALL);
         jsonGenerator.writeFieldName(FIELD_NAME_ARG_LIST);
         jsonGenerator.writeStartArray();
         for (int arg : aggCall.getArgList()) {
@@ -86,37 +82,4 @@ public class AggregateCallJsonSerializer extends StdSerializer<AggregateCall> {
         serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, aggCall.getType(), jsonGenerator);
         jsonGenerator.writeEndObject();
     }
-
-    private void serialize(SqlAggFunction operator, JsonGenerator gen) throws IOException {
-        gen.writeFieldName(FIELD_NAME_AGG_FUNCTION);
-        gen.writeStartObject();
-        gen.writeStringField(FIELD_NAME_NAME, operator.getName());
-        gen.writeStringField(FIELD_NAME_KIND, operator.kind.name());
-        gen.writeStringField(FIELD_NAME_SYNTAX, operator.getSyntax().name());
-        // TODO if a udf is registered with class name, class name is recorded enough
-        if (operator instanceof AggSqlFunction) {
-            AggSqlFunction aggSqlFunc = (AggSqlFunction) operator;
-            gen.writeStringField(FIELD_NAME_DISPLAY_NAME, aggSqlFunc.displayName());
-            gen.writeStringField(FIELD_NAME_FUNCTION_KIND, FunctionKind.AGGREGATE.name());
-            gen.writeBooleanField(FIELD_NAME_REQUIRES_OVER, aggSqlFunc.requiresOver());
-            gen.writeStringField(
-                    FIELD_NAME_INSTANCE,
-                    EncodingUtils.encodeObjectToString(aggSqlFunc.aggregateFunction()));
-        } else if (operator instanceof BridgingSqlAggFunction) {
-            BridgingSqlAggFunction bridgingSqlAggFunc = (BridgingSqlAggFunction) operator;
-            FunctionDefinition functionDefinition = bridgingSqlAggFunc.getDefinition();
-            if (functionDefinition instanceof BuiltInFunctionDefinition) {
-                // just record the flag, we can find it by name
-                gen.writeBooleanField(FIELD_NAME_BUILT_IN, true);
-            } else {
-                assert functionDefinition.getKind() == FunctionKind.AGGREGATE;
-                gen.writeStringField(FIELD_NAME_FUNCTION_KIND, FunctionKind.AGGREGATE.name());
-                gen.writeStringField(
-                        FIELD_NAME_INSTANCE,
-                        EncodingUtils.encodeObjectToString(functionDefinition));
-                gen.writeBooleanField(FIELD_NAME_BRIDGING, true);
-            }
-        }
-        gen.writeEndObject();
-    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
index 9c23498..ff9e322 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
@@ -317,8 +317,10 @@ public class RexNodeJsonDeserializer extends StdDeserializer<RexNode> {
         return serdeContext.getRexBuilder().makeCall(callType, operator, rexOperands);
     }
 
-    private static SqlOperator deserializeSqlOperator(
-            JsonNode jsonNode, SerdeContext serdeContext) {
+    // --------------------------------------------------------------------------------------------
+
+    /** Logic shared with {@link AggregateCallJsonDeserializer}. */
+    static SqlOperator deserializeSqlOperator(JsonNode jsonNode, SerdeContext serdeContext) {
         final SqlSyntax syntax;
         if (jsonNode.has(FIELD_NAME_SYNTAX)) {
             syntax =
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
index bd5a6e8..d38f08a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
@@ -359,7 +359,10 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
         gen.writeEndObject();
     }
 
-    private static void serializeSqlOperator(
+    // --------------------------------------------------------------------------------------------
+
+    /** Logic shared with {@link AggregateCallJsonSerializer}. */
+    static void serializeSqlOperator(
             SqlOperator operator,
             JsonGenerator gen,
             SerializerProvider serializerProvider,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest_jsonplan/testExpand.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest_jsonplan/testExpand.out
index 3925e50..04d059e 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest_jsonplan/testExpand.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest_jsonplan/testExpand.out
@@ -235,11 +235,8 @@
     "grouping" : [ 0, 3, 4 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : 5,
       "distinct" : true,
@@ -248,11 +245,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "FIRST_VALUE",
-        "kind" : "FIRST_VALUE",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$FIRST_VALUE$1",
       "argList" : [ 2 ],
       "filterArg" : 6,
       "distinct" : false,
@@ -291,11 +284,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "$SUM0",
-        "kind" : "SUM0",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$$SUM0$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -304,11 +293,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "FIRST_VALUE",
-        "kind" : "FIRST_VALUE",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$FIRST_VALUE$1",
       "argList" : [ 4 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out
index 23d50e2..5a69103 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out
@@ -97,11 +97,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt_a1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : 2,
       "distinct" : true,
@@ -110,11 +107,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "cnt_a2",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -123,11 +117,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "sum_a",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -136,11 +126,7 @@
       "type" : "BIGINT"
     }, {
       "name" : "sum_b",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -149,11 +135,7 @@
       "type" : "INT NOT NULL"
     }, {
       "name" : "avg_b",
-      "aggFunction" : {
-        "name" : "AVG",
-        "kind" : "AVG",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$AVG$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -162,11 +144,8 @@
       "type" : "INT NOT NULL"
     }, {
       "name" : "cnt_d",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 4 ],
       "filterArg" : -1,
       "distinct" : true,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
index 37b6dad..d56ae50 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
@@ -100,11 +100,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt_a1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : 2,
       "distinct" : true,
@@ -113,11 +110,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "cnt_a2",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -126,11 +120,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "sum_a",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -139,11 +129,7 @@
       "type" : "BIGINT"
     }, {
       "name" : "sum_b",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -152,11 +138,7 @@
       "type" : "INT NOT NULL"
     }, {
       "name" : "avg_b",
-      "aggFunction" : {
-        "name" : "AVG",
-        "kind" : "AVG",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$AVG$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -165,11 +147,8 @@
       "type" : "INT NOT NULL"
     }, {
       "name" : "cnt_d",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 4 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -361,11 +340,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt_a1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : 2,
       "distinct" : true,
@@ -374,11 +350,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "cnt_a2",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -387,11 +360,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "sum_a",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -400,11 +369,7 @@
       "type" : "BIGINT"
     }, {
       "name" : "sum_b",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -413,11 +378,7 @@
       "type" : "INT NOT NULL"
     }, {
       "name" : "avg_b",
-      "aggFunction" : {
-        "name" : "AVG",
-        "kind" : "AVG",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$AVG$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -426,11 +387,8 @@
       "type" : "INT NOT NULL"
     }, {
       "name" : "cnt_d",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 4 ],
       "filterArg" : -1,
       "distinct" : true,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out
index 979378f..54e1aa4 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out
@@ -102,11 +102,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt_a",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -115,11 +112,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "max_b",
-      "aggFunction" : {
-        "name" : "MAX",
-        "kind" : "MAX",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MAX$1",
       "argList" : [ 0 ],
       "filterArg" : 2,
       "distinct" : false,
@@ -128,11 +121,7 @@
       "type" : "INT"
     }, {
       "name" : "min_c",
-      "aggFunction" : {
-        "name" : "MIN",
-        "kind" : "MIN",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MIN$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out
index 155dea2..6d0adeb 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out
@@ -105,11 +105,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt_a",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -118,11 +115,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "max_b",
-      "aggFunction" : {
-        "name" : "MAX",
-        "kind" : "MAX",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MAX$1",
       "argList" : [ 0 ],
       "filterArg" : 2,
       "distinct" : false,
@@ -131,11 +124,7 @@
       "type" : "INT"
     }, {
       "name" : "min_c",
-      "aggFunction" : {
-        "name" : "MIN",
-        "kind" : "MIN",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MIN$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -173,11 +162,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt_a",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -186,11 +172,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "max_b",
-      "aggFunction" : {
-        "name" : "MAX",
-        "kind" : "MAX",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MAX$1",
       "argList" : [ 0 ],
       "filterArg" : 2,
       "distinct" : false,
@@ -199,11 +181,7 @@
       "type" : "INT"
     }, {
       "name" : "min_c",
-      "aggFunction" : {
-        "name" : "MIN",
-        "kind" : "MIN",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MIN$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=false].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=false].out
index 60cc1ac..c8d7b01 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=false].out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=false].out
@@ -107,11 +107,7 @@
     "grouping" : [ ],
     "aggCalls" : [ {
       "name" : "avg_a",
-      "aggFunction" : {
-        "name" : "AVG",
-        "kind" : "AVG",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$AVG$1",
       "argList" : [ 0 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -120,11 +116,8 @@
       "type" : "BIGINT"
     }, {
       "name" : "cnt",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -133,11 +126,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "min_b",
-      "aggFunction" : {
-        "name" : "MIN",
-        "kind" : "MIN",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MIN$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -146,11 +135,7 @@
       "type" : "INT"
     }, {
       "name" : "max_c",
-      "aggFunction" : {
-        "name" : "MAX",
-        "kind" : "MAX",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MAX$1",
       "argList" : [ 2 ],
       "filterArg" : 3,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=true].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=true].out
index 89833ab..c53e463 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=true].out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggWithoutGroupBy[isMiniBatchEnabled=true].out
@@ -111,11 +111,7 @@
     "grouping" : [ ],
     "aggCalls" : [ {
       "name" : "avg_a",
-      "aggFunction" : {
-        "name" : "AVG",
-        "kind" : "AVG",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$AVG$1",
       "argList" : [ 0 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -124,11 +120,8 @@
       "type" : "BIGINT"
     }, {
       "name" : "cnt",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -137,11 +130,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "min_b",
-      "aggFunction" : {
-        "name" : "MIN",
-        "kind" : "MIN",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MIN$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -150,11 +139,7 @@
       "type" : "INT"
     }, {
       "name" : "max_c",
-      "aggFunction" : {
-        "name" : "MAX",
-        "kind" : "MAX",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MAX$1",
       "argList" : [ 2 ],
       "filterArg" : 3,
       "distinct" : false,
@@ -191,11 +176,7 @@
     "grouping" : [ ],
     "aggCalls" : [ {
       "name" : "avg_a",
-      "aggFunction" : {
-        "name" : "AVG",
-        "kind" : "AVG",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$AVG$1",
       "argList" : [ 0 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -204,11 +185,8 @@
       "type" : "BIGINT"
     }, {
       "name" : "cnt",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -217,11 +195,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "min_b",
-      "aggFunction" : {
-        "name" : "MIN",
-        "kind" : "MIN",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MIN$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -230,11 +204,7 @@
       "type" : "INT"
     }, {
       "name" : "max_c",
-      "aggFunction" : {
-        "name" : "MAX",
-        "kind" : "MAX",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MAX$1",
       "argList" : [ 2 ],
       "filterArg" : 3,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out
index b278f6a..cc12b89 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out
@@ -91,14 +91,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "a1",
-      "aggFunction" : {
-        "name" : "my_sum1",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAFhvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkVmFyU3VtMUFnZ0Z1bmN0aW9uUncPGXj5ZSICAAB4cgAyb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuQWdncmVnYXRlRnVuY3Rpb24g1IzcoWgbiQIAAHhyADxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5JbXBlcmF0aXZlQWdncmVnYXRlRnVuY3Rpb27yV4D2r81spwIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cA",
-        "bridging" : true
-      },
+      "catalogName" : "`default_catalog`.`default_database`.`my_sum1`",
       "argList" : [ 0, 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -107,14 +100,7 @@
       "type" : "BIGINT"
     }, {
       "name" : "a2",
-      "aggFunction" : {
-        "name" : "my_sum2",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAFhvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkVmFyU3VtMkFnZ0Z1bmN0aW9ucnj2tPeOlPcCAAB4cgAyb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuQWdncmVnYXRlRnVuY3Rpb24g1IzcoWgbiQIAAHhyADxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5JbXBlcmF0aXZlQWdncmVnYXRlRnVuY3Rpb27yV4D2r81spwIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cA",
-        "bridging" : true
-      },
+      "systemName" : "my_sum2",
       "argList" : [ 2, 0 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -123,14 +109,8 @@
       "type" : "BIGINT"
     }, {
       "name" : "a3",
-      "aggFunction" : {
-        "name" : "my_avg",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAFFvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkV2VpZ2h0ZWRBdmctK0s1MInyIQIAAHhyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5BZ2dyZWdhdGVGdW5jdGlvbiDUjNyhaBuJAgAAeHIAPG9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkltcGVyYXRpdmVBZ2dyZWdhdGVGdW5jdGlvbvJXgPavzWynAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLlVzZXJEZWZpbmVkRnVuY3Rpb25ZaAsIu0MPFgIAAHhw",
-        "bridging" : true
-      },
+      "catalogName" : "`default_catalog`.`default_database`.`my_avg`",
+      "class" : "org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvg",
       "argList" : [ 3, 4 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -139,14 +119,7 @@
       "type" : "BIGINT"
     }, {
       "name" : "c1",
-      "aggFunction" : {
-        "name" : "my_count",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAFNvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkQ291bnREaXN0aW5jdIvbEsgDUXHeAgAAeHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uINSM3KFoG4kCAAB4cgA8b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuSW1wZXJhdGl2ZUFnZ3JlZ2F0ZUZ1bmN0aW9u8leA9q_NbKcCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvblloCwi7Qw8WAgAAeHA",
-        "bridging" : true
-      },
+      "systemName" : "my_count",
       "argList" : [ 5 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out
index b03d962..41687d0 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out
@@ -107,14 +107,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "a1",
-      "aggFunction" : {
-        "name" : "my_sum1",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAFhvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkVmFyU3VtMUFnZ0Z1bmN0aW9uUncPGXj5ZSICAAB4cgAyb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuQWdncmVnYXRlRnVuY3Rpb24g1IzcoWgbiQIAAHhyADxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5JbXBlcmF0aXZlQWdncmVnYXRlRnVuY3Rpb27yV4D2r81spwIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cA",
-        "bridging" : true
-      },
+      "catalogName" : "`default_catalog`.`default_database`.`my_sum1`",
       "argList" : [ 0, 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -123,14 +116,7 @@
       "type" : "BIGINT"
     }, {
       "name" : "a2",
-      "aggFunction" : {
-        "name" : "my_sum2",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAFhvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkVmFyU3VtMkFnZ0Z1bmN0aW9ucnj2tPeOlPcCAAB4cgAyb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuQWdncmVnYXRlRnVuY3Rpb24g1IzcoWgbiQIAAHhyADxvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5JbXBlcmF0aXZlQWdncmVnYXRlRnVuY3Rpb27yV4D2r81spwIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9uWWgLCLtDDxYCAAB4cA",
-        "bridging" : true
-      },
+      "systemName" : "my_sum2",
       "argList" : [ 2, 0 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -139,14 +125,8 @@
       "type" : "BIGINT"
     }, {
       "name" : "a3",
-      "aggFunction" : {
-        "name" : "my_avg",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAFFvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkV2VpZ2h0ZWRBdmctK0s1MInyIQIAAHhyADJvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5BZ2dyZWdhdGVGdW5jdGlvbiDUjNyhaBuJAgAAeHIAPG9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkltcGVyYXRpdmVBZ2dyZWdhdGVGdW5jdGlvbvJXgPavzWynAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLlVzZXJEZWZpbmVkRnVuY3Rpb25ZaAsIu0MPFgIAAHhw",
-        "bridging" : true
-      },
+      "catalogName" : "`default_catalog`.`default_database`.`my_avg`",
+      "class" : "org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvg",
       "argList" : [ 3, 4 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -155,14 +135,7 @@
       "type" : "BIGINT"
     }, {
       "name" : "c1",
-      "aggFunction" : {
-        "name" : "my_count",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAFNvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkQ291bnREaXN0aW5jdIvbEsgDUXHeAgAAeHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uINSM3KFoG4kCAAB4cgA8b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuSW1wZXJhdGl2ZUFnZ3JlZ2F0ZUZ1bmN0aW9u8leA9q_NbKcCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvblloCwi7Qw8WAgAAeHA",
-        "bridging" : true
-      },
+      "systemName" : "my_count",
       "argList" : [ 5 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
index ff5a5c6..aef0a94 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
@@ -203,11 +203,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -216,11 +213,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out
index ef5f44c..a0a44cb 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out
@@ -203,11 +203,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -216,11 +213,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
index 442e2b9..f41b8d2 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
@@ -203,11 +203,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$3",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -216,11 +213,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$4",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -229,11 +222,8 @@
       "type" : "INT"
     }, {
       "name" : "EXPR$5",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -242,14 +232,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$6",
-      "aggFunction" : {
-        "name" : "concat_distinct_agg",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAF9vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkQ29uY2F0RGlzdGluY3RBZ2dGdW5jdGlvbtrVmfNk5uTFAgAAeHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uINSM3KFoG4kCAAB4cgA8b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuSW1wZXJhdGl2ZUFnZ3JlZ2F0ZUZ1bmN0aW9u8leA9q_NbKcCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvblloCwi7Qw8WAgAAeHA",
-        "bridging" : true
-      },
+      "catalogName" : "`default_catalog`.`default_database`.`concat_distinct_agg`",
+      "class" : "org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$ConcatDistinctAggFunction",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
index 825f324..1c9fbb7 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
@@ -290,11 +290,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out
index 5f3bb87..b44c9c4 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out
@@ -290,11 +290,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
index f96a1c5..d9c1ed0 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
@@ -269,11 +269,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
index e2d4464..76cc64f 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
@@ -105,11 +105,8 @@
     "grouping" : [ 0, 2 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -216,11 +213,8 @@
     "finalAggGrouping" : [ 0 ],
     "partialOriginalAggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -259,11 +253,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "$SUM0",
-        "kind" : "SUM0",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$$SUM0$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
index b5a5b58..abb17ca 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
@@ -65,11 +65,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "b",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -78,11 +75,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "b1",
-      "aggFunction" : {
-        "name" : "MAX",
-        "kind" : "MAX",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MAX$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -120,11 +113,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "b",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -133,11 +123,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "b1",
-      "aggFunction" : {
-        "name" : "MAX",
-        "kind" : "MAX",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$MAX$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -204,11 +190,7 @@
     "grouping" : [ 0, 2 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -217,11 +199,8 @@
       "type" : "INT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -230,11 +209,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -343,11 +319,7 @@
     "finalAggGrouping" : [ 0 ],
     "partialOriginalAggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -356,11 +328,8 @@
       "type" : "INT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -369,11 +338,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -412,11 +378,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -425,11 +387,7 @@
       "type" : "INT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "$SUM0",
-        "kind" : "SUM0",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$$SUM0$1",
       "argList" : [ 3 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -438,11 +396,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "$SUM0",
-        "kind" : "SUM0",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$$SUM0$1",
       "argList" : [ 4 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out
index 5326ed8..3bd9221 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithPk.out
@@ -59,11 +59,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "a2",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -177,11 +173,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "b2",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out
index 92f7c91..2541c41 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out
@@ -285,11 +285,8 @@
         },
         "aggCalls" : [ {
           "name" : "w0$o0",
-          "aggFunction" : {
-            "name" : "COUNT",
-            "kind" : "COUNT",
-            "syntax" : "FUNCTION_STAR"
-          },
+          "syntax" : "FUNCTION_STAR",
+          "internalName" : "$COUNT$1",
           "argList" : [ 0 ],
           "filterArg" : -1,
           "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out
index 84130e2..461837d 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out
@@ -296,11 +296,8 @@
         },
         "aggCalls" : [ {
           "name" : "w0$o0",
-          "aggFunction" : {
-            "name" : "COUNT",
-            "kind" : "COUNT",
-            "syntax" : "FUNCTION_STAR"
-          },
+          "syntax" : "FUNCTION_STAR",
+          "internalName" : "$COUNT$1",
           "argList" : [ 1 ],
           "filterArg" : -1,
           "distinct" : false,
@@ -309,11 +306,7 @@
           "type" : "BIGINT NOT NULL"
         }, {
           "name" : "w0$o1",
-          "aggFunction" : {
-            "name" : "$SUM0",
-            "kind" : "SUM0",
-            "syntax" : "FUNCTION"
-          },
+          "internalName" : "$$SUM0$1",
           "argList" : [ 1 ],
           "filterArg" : -1,
           "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out
index 2c7b5d6..fba1d48 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRowsOverWithBuiltinProctime.out
@@ -233,11 +233,8 @@
         },
         "aggCalls" : [ {
           "name" : "w0$o0",
-          "aggFunction" : {
-            "name" : "COUNT",
-            "kind" : "COUNT",
-            "syntax" : "FUNCTION_STAR"
-          },
+          "syntax" : "FUNCTION_STAR",
+          "internalName" : "$COUNT$1",
           "argList" : [ 1 ],
           "filterArg" : -1,
           "distinct" : false,
@@ -246,11 +243,7 @@
           "type" : "BIGINT NOT NULL"
         }, {
           "name" : "w0$o1",
-          "aggFunction" : {
-            "name" : "$SUM0",
-            "kind" : "SUM0",
-            "syntax" : "FUNCTION"
-          },
+          "internalName" : "$$SUM0$1",
           "argList" : [ 1 ],
           "filterArg" : -1,
           "distinct" : false,
@@ -259,11 +252,7 @@
           "type" : "BIGINT NOT NULL"
         }, {
           "name" : "w0$o2",
-          "aggFunction" : {
-            "name" : "MIN",
-            "kind" : "MIN",
-            "syntax" : "FUNCTION"
-          },
+          "internalName" : "$MIN$1",
           "argList" : [ 1 ],
           "filterArg" : -1,
           "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out
index 6ae8418..6f7c396 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out
@@ -274,11 +274,8 @@
         },
         "aggCalls" : [ {
           "name" : "w0$o0",
-          "aggFunction" : {
-            "name" : "COUNT",
-            "kind" : "COUNT",
-            "syntax" : "FUNCTION_STAR"
-          },
+          "syntax" : "FUNCTION_STAR",
+          "internalName" : "$COUNT$1",
           "argList" : [ 0 ],
           "filterArg" : -1,
           "distinct" : false,
@@ -287,11 +284,7 @@
           "type" : "BIGINT NOT NULL"
         }, {
           "name" : "w0$o1",
-          "aggFunction" : {
-            "name" : "$SUM0",
-            "kind" : "SUM0",
-            "syntax" : "FUNCTION"
-          },
+          "internalName" : "$$SUM0$1",
           "argList" : [ 0 ],
           "filterArg" : -1,
           "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProctimeBoundedDistinctPartitionedRowOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProctimeBoundedDistinctPartitionedRowOver.out
index c457ed4..6c8d61c 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProctimeBoundedDistinctPartitionedRowOver.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProctimeBoundedDistinctPartitionedRowOver.out
@@ -280,11 +280,8 @@
         },
         "aggCalls" : [ {
           "name" : "w0$o0",
-          "aggFunction" : {
-            "name" : "COUNT",
-            "kind" : "COUNT",
-            "syntax" : "FUNCTION_STAR"
-          },
+          "syntax" : "FUNCTION_STAR",
+          "internalName" : "$COUNT$1",
           "argList" : [ 0 ],
           "filterArg" : -1,
           "distinct" : true,
@@ -293,11 +290,7 @@
           "type" : "BIGINT NOT NULL"
         }, {
           "name" : "w0$o1",
-          "aggFunction" : {
-            "name" : "$SUM0",
-            "kind" : "SUM0",
-            "syntax" : "FUNCTION"
-          },
+          "internalName" : "$$SUM0$1",
           "argList" : [ 0 ],
           "filterArg" : -1,
           "distinct" : true,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProctimeBoundedDistinctWithNonDistinctPartitionedRowOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProctimeBoundedDistinctWithNonDistinctPartitionedRowOver.out
index 2267835..3e8b12d 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProctimeBoundedDistinctWithNonDistinctPartitionedRowOver.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testProctimeBoundedDistinctWithNonDistinctPartitionedRowOver.out
@@ -291,11 +291,8 @@
         },
         "aggCalls" : [ {
           "name" : "w0$o0",
-          "aggFunction" : {
-            "name" : "COUNT",
-            "kind" : "COUNT",
-            "syntax" : "FUNCTION_STAR"
-          },
+          "syntax" : "FUNCTION_STAR",
+          "internalName" : "$COUNT$1",
           "argList" : [ 0 ],
           "filterArg" : -1,
           "distinct" : false,
@@ -304,11 +301,7 @@
           "type" : "BIGINT NOT NULL"
         }, {
           "name" : "w0$o1",
-          "aggFunction" : {
-            "name" : "$SUM0",
-            "kind" : "SUM0",
-            "syntax" : "FUNCTION"
-          },
+          "internalName" : "$$SUM0$1",
           "argList" : [ 0 ],
           "filterArg" : -1,
           "distinct" : false,
@@ -317,11 +310,8 @@
           "type" : "INT NOT NULL"
         }, {
           "name" : "w0$o2",
-          "aggFunction" : {
-            "name" : "COUNT",
-            "kind" : "COUNT",
-            "syntax" : "FUNCTION_STAR"
-          },
+          "syntax" : "FUNCTION_STAR",
+          "internalName" : "$COUNT$1",
           "argList" : [ 0 ],
           "filterArg" : -1,
           "distinct" : true,
@@ -330,11 +320,8 @@
           "type" : "BIGINT NOT NULL"
         }, {
           "name" : "w0$o3",
-          "aggFunction" : {
-            "name" : "COUNT",
-            "kind" : "COUNT",
-            "syntax" : "FUNCTION_STAR"
-          },
+          "syntax" : "FUNCTION_STAR",
+          "internalName" : "$COUNT$1",
           "argList" : [ 2 ],
           "filterArg" : -1,
           "distinct" : true,
@@ -343,11 +330,7 @@
           "type" : "BIGINT NOT NULL"
         }, {
           "name" : "w0$o4",
-          "aggFunction" : {
-            "name" : "$SUM0",
-            "kind" : "SUM0",
-            "syntax" : "FUNCTION"
-          },
+          "internalName" : "$$SUM0$1",
           "argList" : [ 2 ],
           "filterArg" : -1,
           "distinct" : true,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out
index 4d9d22c..cd7d26f 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateJsonPlanTest_jsonplan/testRowTimeBoundedPartitionedRowsOver.out
@@ -167,11 +167,8 @@
         },
         "aggCalls" : [ {
           "name" : "w0$o0",
-          "aggFunction" : {
-            "name" : "COUNT",
-            "kind" : "COUNT",
-            "syntax" : "FUNCTION_STAR"
-          },
+          "syntax" : "FUNCTION_STAR",
+          "internalName" : "$COUNT$1",
           "argList" : [ 0 ],
           "filterArg" : -1,
           "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out
index ec56389..be85b4c 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testDistinctSplitEnabled.out
@@ -245,11 +245,8 @@
     "grouping" : [ 0, 3 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -258,11 +255,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -271,11 +264,8 @@
       "type" : "BIGINT"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -412,11 +402,8 @@
     "grouping" : [ 0, 1 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -425,11 +412,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -438,11 +421,8 @@
       "type" : "BIGINT"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -572,11 +552,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "$SUM0",
-        "kind" : "SUM0",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$$SUM0$1",
       "argList" : [ 4 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -585,11 +561,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 5 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -598,11 +570,7 @@
       "type" : "BIGINT"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "$SUM0",
-        "kind" : "SUM0",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$$SUM0$1",
       "argList" : [ 6 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -654,11 +622,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : null,
-      "aggFunction" : {
-        "name" : "$SUM0",
-        "kind" : "SUM0",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$$SUM0$1",
       "argList" : [ 4 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -667,11 +631,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 5 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -680,11 +640,7 @@
       "type" : "BIGINT"
     }, {
       "name" : null,
-      "aggFunction" : {
-        "name" : "$SUM0",
-        "kind" : "SUM0",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$$SUM0$1",
       "argList" : [ 6 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeCumulateWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeCumulateWindow.out
index 6f1dc2f..4d3c3dc 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeCumulateWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeCumulateWindow.out
@@ -224,11 +224,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -237,11 +234,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$3",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -292,11 +285,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -305,11 +295,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$3",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeCumulateWindowWithOffset.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeCumulateWindowWithOffset.out
index 9eefa48..3915c80 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeCumulateWindowWithOffset.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeCumulateWindowWithOffset.out
@@ -224,11 +224,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -237,11 +234,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$3",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -293,11 +286,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -306,11 +296,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$3",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
index 26f75c1..4c8d84a 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
@@ -224,11 +224,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -237,11 +234,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -292,11 +285,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -305,11 +295,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindowWithOffset.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindowWithOffset.out
index 1051652..25d8c8f 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindowWithOffset.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindowWithOffset.out
@@ -224,11 +224,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -237,11 +234,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -293,11 +286,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -306,11 +296,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
index d5d30f2..6e258b3 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
@@ -224,11 +224,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$3",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -237,11 +234,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$4",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -250,11 +243,8 @@
       "type" : "INT"
     }, {
       "name" : "EXPR$5",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -263,14 +253,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$6",
-      "aggFunction" : {
-        "name" : "concat_distinct_agg",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAF9vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkQ29uY2F0RGlzdGluY3RBZ2dGdW5jdGlvbtrVmfNk5uTFAgAAeHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uINSM3KFoG4kCAAB4cgA8b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuSW1wZXJhdGl2ZUFnZ3JlZ2F0ZUZ1bmN0aW9u8leA9q_NbKcCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvblloCwi7Qw8WAgAAeHA",
-        "bridging" : true
-      },
+      "catalogName" : "`default_catalog`.`default_database`.`concat_distinct_agg`",
+      "class" : "org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$ConcatDistinctAggFunction",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -482,11 +466,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$3",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -495,11 +476,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$4",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -508,11 +485,8 @@
       "type" : "INT"
     }, {
       "name" : "EXPR$5",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -521,14 +495,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$6",
-      "aggFunction" : {
-        "name" : "concat_distinct_agg",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAF9vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkQ29uY2F0RGlzdGluY3RBZ2dGdW5jdGlvbtrVmfNk5uTFAgAAeHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uINSM3KFoG4kCAAB4cgA8b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuSW1wZXJhdGl2ZUFnZ3JlZ2F0ZUZ1bmN0aW9u8leA9q_NbKcCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvblloCwi7Qw8WAgAAeHA",
-        "bridging" : true
-      },
+      "catalogName" : "`default_catalog`.`default_database`.`concat_distinct_agg`",
+      "class" : "org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$ConcatDistinctAggFunction",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindowWithOffset.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindowWithOffset.out
index 5910460..ea0bd37 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindowWithOffset.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindowWithOffset.out
@@ -224,11 +224,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$3",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -237,11 +234,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$4",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -250,11 +243,8 @@
       "type" : "INT"
     }, {
       "name" : "EXPR$5",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -263,14 +253,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$6",
-      "aggFunction" : {
-        "name" : "concat_distinct_agg",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAF9vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkQ29uY2F0RGlzdGluY3RBZ2dGdW5jdGlvbtrVmfNk5uTFAgAAeHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uINSM3KFoG4kCAAB4cgA8b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuSW1wZXJhdGl2ZUFnZ3JlZ2F0ZUZ1bmN0aW9u8leA9q_NbKcCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvblloCwi7Qw8WAgAAeHA",
-        "bridging" : true
-      },
+      "catalogName" : "`default_catalog`.`default_database`.`concat_distinct_agg`",
+      "class" : "org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$ConcatDistinctAggFunction",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -483,11 +467,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$3",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -496,11 +477,7 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$4",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
@@ -509,11 +486,8 @@
       "type" : "INT"
     }, {
       "name" : "EXPR$5",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -522,14 +496,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "EXPR$6",
-      "aggFunction" : {
-        "name" : "concat_distinct_agg",
-        "kind" : "OTHER_FUNCTION",
-        "syntax" : "FUNCTION",
-        "functionKind" : "AGGREGATE",
-        "instance" : "rO0ABXNyAF9vcmcuYXBhY2hlLmZsaW5rLnRhYmxlLnBsYW5uZXIucGxhbi51dGlscy5KYXZhVXNlckRlZmluZWRBZ2dGdW5jdGlvbnMkQ29uY2F0RGlzdGluY3RBZ2dGdW5jdGlvbtrVmfNk5uTFAgAAeHIAMm9yZy5hcGFjaGUuZmxpbmsudGFibGUuZnVuY3Rpb25zLkFnZ3JlZ2F0ZUZ1bmN0aW9uINSM3KFoG4kCAAB4cgA8b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuSW1wZXJhdGl2ZUFnZ3JlZ2F0ZUZ1bmN0aW9u8leA9q_NbKcCAAB4cgA0b3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuVXNlckRlZmluZWRGdW5jdGlvblloCwi7Qw8WAgAAeHA",
-        "bridging" : true
-      },
+      "catalogName" : "`default_catalog`.`default_database`.`concat_distinct_agg`",
+      "class" : "org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$ConcatDistinctAggFunction",
       "argList" : [ 2 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeCumulateWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeCumulateWindow.out
index aae8b5f..0217355 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeCumulateWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeCumulateWindow.out
@@ -289,11 +289,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
index 4895488..2973abc 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out
@@ -290,11 +290,7 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$1",
-      "aggFunction" : {
-        "name" : "SUM",
-        "kind" : "SUM",
-        "syntax" : "FUNCTION"
-      },
+      "internalName" : "$SUM$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
index 983c2a6..02faef6 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
@@ -269,11 +269,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "EXPR$2",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowJoinJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowJoinJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
index dddee0c..617e8b8 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowJoinJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowJoinJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
@@ -173,11 +173,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -186,11 +183,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "uv",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -314,11 +308,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -327,11 +318,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "uv",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -665,11 +653,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -678,11 +663,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "uv",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,
@@ -806,11 +788,8 @@
     "grouping" : [ 0 ],
     "aggCalls" : [ {
       "name" : "cnt",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ ],
       "filterArg" : -1,
       "distinct" : false,
@@ -819,11 +798,8 @@
       "type" : "BIGINT NOT NULL"
     }, {
       "name" : "uv",
-      "aggFunction" : {
-        "name" : "COUNT",
-        "kind" : "COUNT",
-        "syntax" : "FUNCTION_STAR"
-      },
+      "syntax" : "FUNCTION_STAR",
+      "internalName" : "$COUNT$1",
       "argList" : [ 1 ],
       "filterArg" : -1,
       "distinct" : true,

[flink] 02/02: [hotfix][table-planner] Clean up serde classes

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3d8748316f597e6936442dc514f266ffbcce4bed
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Feb 24 14:21:25 2022 +0100

    [hotfix][table-planner] Clean up serde classes
---
 .../plan/abilities/sink/WritingMetadataSpec.java   |  6 ---
 .../abilities/source/SourceAbilitySpecBase.java    |  6 ---
 .../planner/plan/logical/WindowingStrategy.java    |  6 ---
 .../table/planner/plan/nodes/exec/ExecNode.java    |  6 ---
 .../planner/plan/nodes/exec/InputProperty.java     |  6 ---
 .../plan/nodes/exec/common/CommonExecValues.java   | 10 +---
 .../exec/serde/AggregateCallJsonDeserializer.java  | 12 +++--
 .../exec/serde/AggregateCallJsonSerializer.java    | 18 +++----
 .../exec/serde/ChangelogModeJsonDeserializer.java  | 14 ++---
 .../exec/serde/ChangelogModeJsonSerializer.java    | 11 ++--
 .../nodes/exec/serde/ColumnJsonDeserializer.java   | 11 +++-
 .../nodes/exec/serde/ColumnJsonSerializer.java     | 31 ++++++-----
 .../ContextResolvedTableJsonDeserializer.java      | 11 +++-
 .../serde/ContextResolvedTableJsonSerializer.java  | 16 ++++--
 .../nodes/exec/serde/DataTypeJsonDeserializer.java |  4 +-
 .../nodes/exec/serde/DataTypeJsonSerializer.java   | 20 ++++----
 .../exec/serde/ExecNodeGraphJsonDeserializer.java  | 13 +++--
 .../exec/serde/ExecNodeGraphJsonSerializer.java    | 11 ++--
 .../exec/serde/FlinkVersionJsonDeserializer.java   | 12 +++--
 .../exec/serde/FlinkVersionJsonSerializer.java     | 12 +++--
 .../plan/nodes/exec/serde/JsonPlanEdge.java        | 24 +++++----
 .../plan/nodes/exec/serde/JsonPlanGraph.java       | 16 +++---
 .../plan/nodes/exec/serde/JsonSerdeUtil.java       | 24 ++++-----
 .../exec/serde/LogicalTypeJsonDeserializer.java    |  4 +-
 .../exec/serde/LogicalTypeJsonSerializer.java      | 60 +++++++++++-----------
 .../exec/serde/LogicalWindowJsonDeserializer.java  | 11 ++--
 .../exec/serde/LogicalWindowJsonSerializer.java    | 39 +++++++-------
 .../serde/ObjectIdentifierJsonDeserializer.java    | 12 +++--
 .../exec/serde/ObjectIdentifierJsonSerializer.java | 12 +++--
 .../exec/serde/RelDataTypeJsonDeserializer.java    |  4 +-
 .../exec/serde/RelDataTypeJsonSerializer.java      |  4 +-
 .../RequiredDistributionJsonDeserializer.java      | 15 ++++--
 .../serde/RequiredDistributionJsonSerializer.java  | 12 +++--
 .../ResolvedCatalogTableJsonDeserializer.java      | 11 +++-
 .../serde/ResolvedCatalogTableJsonSerializer.java  | 19 ++++---
 .../serde/ResolvedExpressionJsonDeserializer.java  | 11 +++-
 .../serde/ResolvedExpressionJsonSerializer.java    | 19 ++++---
 .../exec/serde/ResolvedSchemaJsonDeserializer.java | 11 +++-
 .../exec/serde/ResolvedSchemaJsonSerializer.java   | 17 ++++--
 .../nodes/exec/serde/RexNodeJsonDeserializer.java  |  4 +-
 .../nodes/exec/serde/RexNodeJsonSerializer.java    | 60 +++++++++++-----------
 .../exec/serde/RexWindowBoundJsonDeserializer.java | 11 ++--
 .../exec/serde/RexWindowBoundJsonSerializer.java   | 28 +++++-----
 .../plan/nodes/exec/serde/SerdeContext.java        |  2 +
 .../nodes/exec/serde/ShuffleJsonDeserializer.java  | 15 ++++--
 .../nodes/exec/serde/ShuffleJsonSerializer.java    | 12 +++--
 .../nodes/exec/serde/UniqueConstraintMixin.java    | 10 ++--
 .../plan/nodes/exec/serde/WatermarkSpecMixin.java  |  6 ++-
 .../serde/WindowReferenceJsonDeserializer.java     |  4 +-
 .../exec/serde/WindowReferenceJsonSerializer.java  |  8 +--
 .../planner/plan/nodes/exec/spec/OverSpec.java     |  9 ----
 .../stream/StreamExecGlobalGroupAggregate.java     |  6 ---
 .../stream/StreamExecGlobalWindowAggregate.java    |  6 ---
 .../stream/StreamExecGroupWindowAggregate.java     |  6 ---
 .../StreamExecIncrementalGroupAggregate.java       |  6 ---
 .../plan/nodes/exec/stream/StreamExecSink.java     |  6 ---
 .../table/planner/plan/utils/LookupJoinUtil.java   |  6 ---
 57 files changed, 416 insertions(+), 350 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/WritingMetadataSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/WritingMetadataSpec.java
index 46a9845..dcdd948 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/WritingMetadataSpec.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/WritingMetadataSpec.java
@@ -21,8 +21,6 @@ package org.apache.flink.table.planner.plan.abilities.sink;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.utils.TypeConversions;
@@ -31,8 +29,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import java.util.List;
 import java.util.Objects;
@@ -52,8 +48,6 @@ public final class WritingMetadataSpec implements SinkAbilitySpec {
     private final List<String> metadataKeys;
 
     @JsonProperty(FIELD_NAME_CONSUMED_TYPE)
-    @JsonSerialize(using = LogicalTypeJsonSerializer.class)
-    @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
     private final LogicalType consumedType;
 
     @JsonCreator
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpecBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpecBase.java
index 8764205..1b5cbf3 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpecBase.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpecBase.java
@@ -18,15 +18,11 @@
 
 package org.apache.flink.table.planner.plan.abilities.source;
 
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import javax.annotation.Nullable;
 
@@ -40,8 +36,6 @@ public abstract class SourceAbilitySpecBase implements SourceAbilitySpec {
     public static final String FIELD_NAME_PRODUCED_TYPE = "producedType";
 
     @JsonProperty(FIELD_NAME_PRODUCED_TYPE)
-    @JsonSerialize(using = LogicalTypeJsonSerializer.class)
-    @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
     private final @Nullable RowType producedType;
 
     public SourceAbilitySpecBase() {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowingStrategy.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowingStrategy.java
index d825419..47bca8d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowingStrategy.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowingStrategy.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.planner.plan.logical;
 
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 
@@ -27,8 +25,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgn
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 /** Logical representation of a windowing strategy. */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "strategy")
@@ -46,8 +42,6 @@ public abstract class WindowingStrategy {
     protected final WindowSpec window;
 
     @JsonProperty(value = FIELD_NAME_TIME_ATTRIBUTE_TYPE)
-    @JsonSerialize(using = LogicalTypeJsonSerializer.class)
-    @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
     protected final LogicalType timeAttributeType;
 
     @JsonProperty(FIELD_NAME_IS_ROWTIME)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
index 229305f..81d07b8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
@@ -20,8 +20,6 @@ package org.apache.flink.table.planner.plan.nodes.exec;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
 import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
 import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -30,8 +28,6 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
 
 import java.util.List;
@@ -75,8 +71,6 @@ public interface ExecNode<T> extends ExecNodeTranslator<T> {
      * data structures.
      */
     @JsonProperty(value = FIELD_NAME_OUTPUT_TYPE)
-    @JsonSerialize(using = LogicalTypeJsonSerializer.class)
-    @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
     LogicalType getOutputType();
 
     /**
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/InputProperty.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/InputProperty.java
index d4ecbcf..a4c8205 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/InputProperty.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/InputProperty.java
@@ -20,14 +20,10 @@ package org.apache.flink.table.planner.plan.nodes.exec;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.operators.Input;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RequiredDistributionJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RequiredDistributionJsonSerializer;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import java.util.Arrays;
 import java.util.Objects;
@@ -82,8 +78,6 @@ public class InputProperty {
      * corresponding input.
      */
     @JsonProperty(FIELD_NAME_REQUIRED_DISTRIBUTION)
-    @JsonSerialize(using = RequiredDistributionJsonSerializer.class)
-    @JsonDeserialize(using = RequiredDistributionJsonDeserializer.class)
     private final RequiredDistribution requiredDistribution;
 
     /** How does the input record trigger the output behavior of the target {@link ExecNode}. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecValues.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecValues.java
index 5a47186..21be27f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecValues.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecValues.java
@@ -27,14 +27,12 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer;
 import org.apache.flink.table.runtime.operators.values.ValuesInputFormat;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
 
 import java.util.Collections;
 import java.util.List;
@@ -78,12 +76,8 @@ public abstract class CommonExecValues extends ExecNodeBase<RowData>
         return transformation;
     }
 
-    /**
-     * In order to use {@link RexNodeJsonSerializer} to serialize {@link RexLiteral}, so we force
-     * cast element of tuples to {@link RexNode} which is the parent class of {@link RexLiteral}.
-     */
     @JsonProperty(value = FIELD_NAME_TUPLES)
-    public List<List<RexNode>> getTuples() {
-        return (List<List<RexNode>>) (Object) tuples;
+    public List<List<RexLiteral>> getTuples() {
+        return tuples;
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java
index 28eef53..6697534 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
+
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -41,13 +43,15 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCall
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_TYPE;
 
 /**
- * JSON deserializer for {@link AggregateCall}. refer to {@link AggregateCallJsonSerializer} for
- * serializer.
+ * JSON deserializer for {@link AggregateCall}.
+ *
+ * @see AggregateCallJsonSerializer for the reverse operation
  */
-public class AggregateCallJsonDeserializer extends StdDeserializer<AggregateCall> {
+@Internal
+final class AggregateCallJsonDeserializer extends StdDeserializer<AggregateCall> {
     private static final long serialVersionUID = 1L;
 
-    public AggregateCallJsonDeserializer() {
+    AggregateCallJsonDeserializer() {
         super(AggregateCall.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
index 35e3ac9..345400f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
@@ -37,18 +37,18 @@ import java.io.IOException;
  * @see AggregateCallJsonDeserializer for the reverse operation
  */
 @Internal
-public class AggregateCallJsonSerializer extends StdSerializer<AggregateCall> {
+final class AggregateCallJsonSerializer extends StdSerializer<AggregateCall> {
     private static final long serialVersionUID = 1L;
 
-    public static final String FIELD_NAME_NAME = "name";
-    public static final String FIELD_NAME_ARG_LIST = "argList";
-    public static final String FIELD_NAME_FILTER_ARG = "filterArg";
-    public static final String FIELD_NAME_DISTINCT = "distinct";
-    public static final String FIELD_NAME_APPROXIMATE = "approximate";
-    public static final String FIELD_NAME_IGNORE_NULLS = "ignoreNulls";
-    public static final String FIELD_NAME_TYPE = "type";
+    static final String FIELD_NAME_NAME = "name";
+    static final String FIELD_NAME_ARG_LIST = "argList";
+    static final String FIELD_NAME_FILTER_ARG = "filterArg";
+    static final String FIELD_NAME_DISTINCT = "distinct";
+    static final String FIELD_NAME_APPROXIMATE = "approximate";
+    static final String FIELD_NAME_IGNORE_NULLS = "ignoreNulls";
+    static final String FIELD_NAME_TYPE = "type";
 
-    public AggregateCallJsonSerializer() {
+    AggregateCallJsonSerializer() {
         super(AggregateCall.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java
index b3c7351..e6590a7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.types.RowKind;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
@@ -30,20 +30,22 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std
 import java.io.IOException;
 
 /**
- * JSON deserializer for {@link ChangelogMode}. refer to {@link ChangelogModeJsonSerializer} for
- * serializer.
+ * JSON deserializer for {@link ChangelogMode}.
+ *
+ * @see ChangelogModeJsonSerializer for the reverse operation
  */
-public class ChangelogModeJsonDeserializer extends StdDeserializer<ChangelogMode> {
+@Internal
+final class ChangelogModeJsonDeserializer extends StdDeserializer<ChangelogMode> {
     private static final long serialVersionUID = 1L;
 
-    public ChangelogModeJsonDeserializer() {
+    ChangelogModeJsonDeserializer() {
         super(ChangelogMode.class);
     }
 
     @Override
     public ChangelogMode deserialize(
             JsonParser jsonParser, DeserializationContext deserializationContext)
-            throws IOException, JsonProcessingException {
+            throws IOException {
         ChangelogMode.Builder builder = ChangelogMode.newBuilder();
         JsonNode rowKindsNode = jsonParser.readValueAsTree();
         for (JsonNode rowKindNode : rowKindsNode) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java
index dbd5a4a..8d0c462 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.types.RowKind;
 
@@ -28,13 +29,15 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 import java.io.IOException;
 
 /**
- * JSON serializer for {@link ChangelogMode}. refer to {@link ChangelogModeJsonDeserializer} for
- * deserializer.
+ * JSON serializer for {@link ChangelogMode}.
+ *
+ * @see ChangelogModeJsonDeserializer for the reverse operation
  */
-public class ChangelogModeJsonSerializer extends StdSerializer<ChangelogMode> {
+@Internal
+final class ChangelogModeJsonSerializer extends StdSerializer<ChangelogMode> {
     private static final long serialVersionUID = 1L;
 
-    public ChangelogModeJsonSerializer() {
+    ChangelogModeJsonSerializer() {
         super(ChangelogMode.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonDeserializer.java
index 15ba34a..6cbce5d3 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.expressions.ResolvedExpression;
@@ -47,12 +48,18 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSer
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.deserializeOptionalField;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.traverse;
 
-class ColumnJsonDeserializer extends StdDeserializer<Column> {
+/**
+ * JSON deserializer for {@link Column}.
+ *
+ * @see ColumnJsonSerializer for the reverse operation
+ */
+@Internal
+final class ColumnJsonDeserializer extends StdDeserializer<Column> {
 
     private static final String SUPPORTED_KINDS =
             Arrays.toString(new String[] {KIND_PHYSICAL, KIND_COMPUTED, KIND_METADATA});
 
-    public ColumnJsonDeserializer() {
+    ColumnJsonDeserializer() {
         super(Column.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonSerializer.java
index 3d6ded0..4721512 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.Column;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -28,20 +29,26 @@ import java.io.IOException;
 
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.serializeOptionalField;
 
-class ColumnJsonSerializer extends StdSerializer<Column> {
+/**
+ * JSON serializer for {@link Column}.
+ *
+ * @see ColumnJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ColumnJsonSerializer extends StdSerializer<Column> {
 
-    public static final String KIND = "kind";
-    public static final String KIND_PHYSICAL = "PHYSICAL";
-    public static final String KIND_COMPUTED = "COMPUTED";
-    public static final String KIND_METADATA = "METADATA";
-    public static final String NAME = "name";
-    public static final String DATA_TYPE = "dataType";
-    public static final String COMMENT = "comment";
-    public static final String EXPRESSION = "expression";
-    public static final String METADATA_KEY = "metadataKey";
-    public static final String IS_VIRTUAL = "isVirtual";
+    static final String KIND = "kind";
+    static final String KIND_PHYSICAL = "PHYSICAL";
+    static final String KIND_COMPUTED = "COMPUTED";
+    static final String KIND_METADATA = "METADATA";
+    static final String NAME = "name";
+    static final String DATA_TYPE = "dataType";
+    static final String COMMENT = "comment";
+    static final String EXPRESSION = "expression";
+    static final String METADATA_KEY = "metadataKey";
+    static final String IS_VIRTUAL = "isVirtual";
 
-    public ColumnJsonSerializer() {
+    ColumnJsonSerializer() {
         super(Column.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java
index c76091d..0828840 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
 import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore;
@@ -45,10 +46,16 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.ContextResolv
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.ContextResolvedTableJsonSerializer.FIELD_NAME_IDENTIFIER;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedCatalogTableJsonSerializer.OPTIONS;
 
-class ContextResolvedTableJsonDeserializer extends StdDeserializer<ContextResolvedTable> {
+/**
+ * JSON deserializer for {@link ContextResolvedTable}.
+ *
+ * @see ContextResolvedTableJsonSerializer for the reverse operation
+ */
+@Internal
+final class ContextResolvedTableJsonDeserializer extends StdDeserializer<ContextResolvedTable> {
     private static final long serialVersionUID = 1L;
 
-    public ContextResolvedTableJsonDeserializer() {
+    ContextResolvedTableJsonDeserializer() {
         super(ContextResolvedTable.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonSerializer.java
index 0b5d98e..5101198 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
@@ -30,14 +31,19 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 
 import java.io.IOException;
 
-/** JSON serializer for {@link ContextResolvedTable}. */
-class ContextResolvedTableJsonSerializer extends StdSerializer<ContextResolvedTable> {
+/**
+ * JSON serializer for {@link ContextResolvedTable}.
+ *
+ * @see ContextResolvedTableJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ContextResolvedTableJsonSerializer extends StdSerializer<ContextResolvedTable> {
     private static final long serialVersionUID = 1L;
 
-    public static final String FIELD_NAME_IDENTIFIER = "identifier";
-    public static final String FIELD_NAME_CATALOG_TABLE = "resolvedTable";
+    static final String FIELD_NAME_IDENTIFIER = "identifier";
+    static final String FIELD_NAME_CATALOG_TABLE = "resolvedTable";
 
-    public ContextResolvedTableJsonSerializer() {
+    ContextResolvedTableJsonSerializer() {
         super(ContextResolvedTable.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonDeserializer.java
index 3a234b2..21f456e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonDeserializer.java
@@ -59,9 +59,9 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil
  * @see DataTypeJsonSerializer for the reverse operation
  */
 @Internal
-public class DataTypeJsonDeserializer extends StdDeserializer<DataType> {
+final class DataTypeJsonDeserializer extends StdDeserializer<DataType> {
 
-    public DataTypeJsonDeserializer() {
+    DataTypeJsonDeserializer() {
         super(DataType.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java
index 53938a8..8593493 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java
@@ -41,7 +41,7 @@ import static org.apache.flink.table.types.utils.DataTypeUtils.isInternal;
  * @see DataTypeJsonDeserializer for the reverse operation
  */
 @Internal
-public final class DataTypeJsonSerializer extends StdSerializer<DataType> {
+final class DataTypeJsonSerializer extends StdSerializer<DataType> {
     private static final long serialVersionUID = 1L;
 
     /*
@@ -83,22 +83,22 @@ public final class DataTypeJsonSerializer extends StdSerializer<DataType> {
      */
 
     // Common fields
-    public static final String FIELD_NAME_TYPE = "logicalType";
-    public static final String FIELD_NAME_CONVERSION_CLASS = "conversionClass";
+    static final String FIELD_NAME_TYPE = "logicalType";
+    static final String FIELD_NAME_CONVERSION_CLASS = "conversionClass";
 
     // ARRAY, MULTISET
-    public static final String FIELD_NAME_ELEMENT_CLASS = "elementClass";
+    static final String FIELD_NAME_ELEMENT_CLASS = "elementClass";
 
     // MAP
-    public static final String FIELD_NAME_KEY_CLASS = "keyClass";
-    public static final String FIELD_NAME_VALUE_CLASS = "valueClass";
+    static final String FIELD_NAME_KEY_CLASS = "keyClass";
+    static final String FIELD_NAME_VALUE_CLASS = "valueClass";
 
     // ROW, STRUCTURED_TYPE, DISTINCT_TYPE
-    public static final String FIELD_NAME_FIELDS = "fields";
-    public static final String FIELD_NAME_FIELD_NAME = "name";
-    public static final String FIELD_NAME_FIELD_CLASS = "fieldClass";
+    static final String FIELD_NAME_FIELDS = "fields";
+    static final String FIELD_NAME_FIELD_NAME = "name";
+    static final String FIELD_NAME_FIELD_CLASS = "fieldClass";
 
-    public DataTypeJsonSerializer() {
+    DataTypeJsonSerializer() {
         super(DataType.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonDeserializer.java
index 4fef3bb..f353012 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -27,13 +28,17 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std
 import java.io.IOException;
 
 /**
- * JSON deserializer for {@link ExecNodeGraph}. It uses the intermediate representation {@link
- * JsonPlanGraph}.
+ * JSON deserializer for {@link ExecNodeGraph}.
+ *
+ * <p>It uses the intermediate representation {@link JsonPlanGraph}.
+ *
+ * @see ExecNodeGraphJsonSerializer for the reverse operation
  */
-public class ExecNodeGraphJsonDeserializer extends StdDeserializer<ExecNodeGraph> {
+@Internal
+final class ExecNodeGraphJsonDeserializer extends StdDeserializer<ExecNodeGraph> {
     private static final long serialVersionUID = 1L;
 
-    public ExecNodeGraphJsonDeserializer() {
+    ExecNodeGraphJsonDeserializer() {
         super(ExecNodeGraph.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonSerializer.java
index c424f91..752f482 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphValidator;
 import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
@@ -29,13 +30,15 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 import java.io.IOException;
 
 /**
- * JSON serializer for {@link ExecNodeGraph}. It uses the intermediate representation {@link
- * JsonPlanGraph}.
+ * JSON serializer for {@link ExecNodeGraph}.
+ *
+ * @see ExecNodeGraphJsonDeserializer for the reverse operation
  */
-public class ExecNodeGraphJsonSerializer extends StdSerializer<ExecNodeGraph> {
+@Internal
+final class ExecNodeGraphJsonSerializer extends StdSerializer<ExecNodeGraph> {
     private static final long serialVersionUID = 1L;
 
-    public ExecNodeGraphJsonSerializer() {
+    ExecNodeGraphJsonSerializer() {
         super(ExecNodeGraph.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonDeserializer.java
index 968b93c..7f2fb0a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonDeserializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -27,11 +28,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std
 
 import java.io.IOException;
 
-/** JSON deserializer for {@link FlinkVersion}. */
-public class FlinkVersionJsonDeserializer extends StdDeserializer<FlinkVersion> {
+/**
+ * JSON deserializer for {@link FlinkVersion}.
+ *
+ * @see FlinkVersionJsonSerializer for the reverse operation
+ */
+@Internal
+final class FlinkVersionJsonDeserializer extends StdDeserializer<FlinkVersion> {
     private static final long serialVersionUID = 1L;
 
-    public FlinkVersionJsonDeserializer() {
+    FlinkVersionJsonDeserializer() {
         super(FlinkVersion.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonSerializer.java
index 3420314..b43c578 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonSerializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.Internal;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
@@ -26,11 +27,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 
 import java.io.IOException;
 
-/** JSON serializer for {@link FlinkVersion}. */
-public class FlinkVersionJsonSerializer extends StdSerializer<FlinkVersion> {
+/**
+ * JSON serializer for {@link FlinkVersion}.
+ *
+ * @see FlinkVersionJsonDeserializer for the reverse operation
+ */
+@Internal
+final class FlinkVersionJsonSerializer extends StdSerializer<FlinkVersion> {
     private static final long serialVersionUID = 1L;
 
-    public FlinkVersionJsonSerializer() {
+    FlinkVersionJsonSerializer() {
         super(FlinkVersion.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanEdge.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanEdge.java
index 17c5c5e..f5401b6 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanEdge.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanEdge.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
@@ -36,12 +37,13 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotatio
  *
  * <p>This model is used only during serialization/deserialization.
  */
+@Internal
 @JsonIgnoreProperties(ignoreUnknown = true)
-class JsonPlanEdge {
-    public static final String FIELD_NAME_SOURCE = "source";
-    public static final String FIELD_NAME_TARGET = "target";
-    public static final String FIELD_NAME_SHUFFLE = "shuffle";
-    public static final String FIELD_NAME_SHUFFLE_MODE = "shuffleMode";
+final class JsonPlanEdge {
+    static final String FIELD_NAME_SOURCE = "source";
+    static final String FIELD_NAME_TARGET = "target";
+    static final String FIELD_NAME_SHUFFLE = "shuffle";
+    static final String FIELD_NAME_SHUFFLE_MODE = "shuffleMode";
 
     /** The source node id of this edge. */
     @JsonProperty(FIELD_NAME_SOURCE)
@@ -59,7 +61,7 @@ class JsonPlanEdge {
     private final StreamExchangeMode exchangeMode;
 
     @JsonCreator
-    public JsonPlanEdge(
+    JsonPlanEdge(
             @JsonProperty(FIELD_NAME_SOURCE) int sourceId,
             @JsonProperty(FIELD_NAME_TARGET) int targetId,
             @JsonProperty(FIELD_NAME_SHUFFLE) ExecEdge.Shuffle shuffle,
@@ -70,24 +72,24 @@ class JsonPlanEdge {
         this.exchangeMode = exchangeMode;
     }
 
-    public int getSourceId() {
+    int getSourceId() {
         return sourceId;
     }
 
-    public int getTargetId() {
+    int getTargetId() {
         return targetId;
     }
 
-    public ExecEdge.Shuffle getShuffle() {
+    ExecEdge.Shuffle getShuffle() {
         return shuffle;
     }
 
-    public StreamExchangeMode getExchangeMode() {
+    StreamExchangeMode getExchangeMode() {
         return exchangeMode;
     }
 
     /** Build {@link JsonPlanEdge} from an {@link ExecEdge}. */
-    public static JsonPlanEdge fromExecEdge(ExecEdge execEdge) {
+    static JsonPlanEdge fromExecEdge(ExecEdge execEdge) {
         return new JsonPlanEdge(
                 execEdge.getSource().getId(),
                 execEdge.getTarget().getId(),
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanGraph.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanGraph.java
index 9e6278c..4dd4a93 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanGraph.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanGraph.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
@@ -48,10 +49,11 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  *
  * <p>This model is used only during serialization/deserialization.
  */
-class JsonPlanGraph {
-    public static final String FIELD_NAME_FLINK_VERSION = "flinkVersion";
-    public static final String FIELD_NAME_NODES = "nodes";
-    public static final String FIELD_NAME_EDGES = "edges";
+@Internal
+final class JsonPlanGraph {
+    static final String FIELD_NAME_FLINK_VERSION = "flinkVersion";
+    static final String FIELD_NAME_NODES = "nodes";
+    static final String FIELD_NAME_EDGES = "edges";
 
     @JsonProperty(FIELD_NAME_FLINK_VERSION)
     private final FlinkVersion flinkVersion;
@@ -63,7 +65,7 @@ class JsonPlanGraph {
     private final List<JsonPlanEdge> edges;
 
     @JsonCreator
-    public JsonPlanGraph(
+    JsonPlanGraph(
             @JsonProperty(FIELD_NAME_FLINK_VERSION) FlinkVersion flinkVersion,
             @JsonProperty(FIELD_NAME_NODES) List<ExecNode<?>> nodes,
             @JsonProperty(FIELD_NAME_EDGES) List<JsonPlanEdge> edges) {
@@ -72,7 +74,7 @@ class JsonPlanGraph {
         this.edges = edges;
     }
 
-    public static JsonPlanGraph fromExecNodeGraph(ExecNodeGraph execGraph) {
+    static JsonPlanGraph fromExecNodeGraph(ExecNodeGraph execGraph) {
         final List<ExecNode<?>> allNodes = new ArrayList<>();
         final List<JsonPlanEdge> allEdges = new ArrayList<>();
         final Set<Integer> nodesIds = new HashSet<>();
@@ -112,7 +114,7 @@ class JsonPlanGraph {
         return new JsonPlanGraph(execGraph.getFlinkVersion(), allNodes, allEdges);
     }
 
-    public ExecNodeGraph convertToExecNodeGraph() {
+    ExecNodeGraph convertToExecNodeGraph() {
         Map<Integer, ExecNode<?>> idToExecNodes = new HashMap<>();
         for (ExecNode<?> execNode : nodes) {
             int id = execNode.getId();
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
index 7e25612..92bec58 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ContextResolvedTable;
@@ -31,11 +32,13 @@ import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.planner.plan.logical.LogicalWindow;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.RequiredDistribution;
 import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
 import org.apache.flink.table.runtime.groupwindow.WindowReference;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.extraction.ExtractionUtils;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -69,7 +72,8 @@ import java.lang.annotation.Annotation;
 import java.lang.reflect.Constructor;
 import java.util.Optional;
 
-/** An utility class that provide abilities for JSON serialization and deserialization. */
+/** A utility class that provide abilities for JSON serialization and deserialization. */
+@Internal
 public class JsonSerdeUtil {
 
     /** Return true if the given class's constructors have @JsonCreator annotation, else false. */
@@ -124,7 +128,7 @@ public class JsonSerdeUtil {
 
     private static Module createFlinkTableJacksonModule() {
         final SimpleModule module = new SimpleModule("Flink table module");
-        ExecNodeMetadataUtil.execNodes().stream()
+        ExecNodeMetadataUtil.execNodes()
                 .forEach(c -> module.registerSubtypes(new NamedType(c, c.getName())));
         registerSerializers(module);
         registerDeserializers(module);
@@ -136,15 +140,10 @@ public class JsonSerdeUtil {
     private static void registerSerializers(SimpleModule module) {
         module.addSerializer(new ExecNodeGraphJsonSerializer());
         module.addSerializer(new FlinkVersionJsonSerializer());
-        // ObjectIdentifierJsonSerializer is needed for LogicalType serialization
         module.addSerializer(new ObjectIdentifierJsonSerializer());
-        // LogicalTypeJsonSerializer is needed for RelDataType serialization
         module.addSerializer(new LogicalTypeJsonSerializer());
-        // DataTypeJsonSerializer is needed for LogicalType serialization
         module.addSerializer(new DataTypeJsonSerializer());
-        // RelDataTypeJsonSerializer is needed for RexNode serialization
         module.addSerializer(new RelDataTypeJsonSerializer());
-        // RexNode is used in many exec nodes, so we register its serializer directly here
         module.addSerializer(new RexNodeJsonSerializer());
         module.addSerializer(new AggregateCallJsonSerializer());
         module.addSerializer(new ChangelogModeJsonSerializer());
@@ -156,24 +155,19 @@ public class JsonSerdeUtil {
         module.addSerializer(new ResolvedCatalogTableJsonSerializer());
         module.addSerializer(new ResolvedExpressionJsonSerializer());
         module.addSerializer(new ResolvedSchemaJsonSerializer());
+        module.addSerializer(new RequiredDistributionJsonSerializer());
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
     private static void registerDeserializers(SimpleModule module) {
         module.addDeserializer(ExecNodeGraph.class, new ExecNodeGraphJsonDeserializer());
         module.addDeserializer(FlinkVersion.class, new FlinkVersionJsonDeserializer());
-        // ObjectIdentifierJsonDeserializer is needed for LogicalType deserialization
         module.addDeserializer(ObjectIdentifier.class, new ObjectIdentifierJsonDeserializer());
-        // LogicalTypeJsonSerializer is needed for RelDataType serialization
         module.addDeserializer(LogicalType.class, new LogicalTypeJsonDeserializer());
-        // DataTypeJsonDeserializer is needed for LogicalType serialization
+        module.addDeserializer(RowType.class, (StdDeserializer) new LogicalTypeJsonDeserializer());
         module.addDeserializer(DataType.class, new DataTypeJsonDeserializer());
-        // RelDataTypeJsonSerializer is needed for RexNode serialization
         module.addDeserializer(RelDataType.class, new RelDataTypeJsonDeserializer());
-        // RexNode is used in many exec nodes, so we register its deserializer directly here
         module.addDeserializer(RexNode.class, new RexNodeJsonDeserializer());
-        // We need this explicit mapping to make sure Jackson can deserialize POJOs declaring fields
-        // with RexLiteral instead of RexNode.
         module.addDeserializer(RexLiteral.class, (StdDeserializer) new RexNodeJsonDeserializer());
         module.addDeserializer(AggregateCall.class, new AggregateCallJsonDeserializer());
         module.addDeserializer(ChangelogMode.class, new ChangelogModeJsonDeserializer());
@@ -187,6 +181,8 @@ public class JsonSerdeUtil {
                 ResolvedCatalogTable.class, new ResolvedCatalogTableJsonDeserializer());
         module.addDeserializer(ResolvedExpression.class, new ResolvedExpressionJsonDeserializer());
         module.addDeserializer(ResolvedSchema.class, new ResolvedSchemaJsonDeserializer());
+        module.addDeserializer(
+                RequiredDistribution.class, new RequiredDistributionJsonDeserializer());
     }
 
     private static void registerMixins(SimpleModule module) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonDeserializer.java
index df48ca4..99ee1e9 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonDeserializer.java
@@ -97,10 +97,10 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJs
  * @see LogicalTypeJsonSerializer for the reverse operation
  */
 @Internal
-public class LogicalTypeJsonDeserializer extends StdDeserializer<LogicalType> {
+final class LogicalTypeJsonDeserializer extends StdDeserializer<LogicalType> {
     private static final long serialVersionUID = 1L;
 
-    public LogicalTypeJsonDeserializer() {
+    LogicalTypeJsonDeserializer() {
         super(LogicalType.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java
index 8507984..390ef6e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java
@@ -64,56 +64,56 @@ import java.io.IOException;
  * @see LogicalTypeJsonDeserializer for the reverse operation.
  */
 @Internal
-public final class LogicalTypeJsonSerializer extends StdSerializer<LogicalType> {
+final class LogicalTypeJsonSerializer extends StdSerializer<LogicalType> {
     private static final long serialVersionUID = 1L;
 
     // Common fields
-    public static final String FIELD_NAME_TYPE_NAME = "type";
-    public static final String FIELD_NAME_NULLABLE = "nullable";
-    public static final String FIELD_NAME_DESCRIPTION = "description";
+    static final String FIELD_NAME_TYPE_NAME = "type";
+    static final String FIELD_NAME_NULLABLE = "nullable";
+    static final String FIELD_NAME_DESCRIPTION = "description";
 
     // CHAR, VARCHAR, BINARY, VARBINARY
-    public static final String FIELD_NAME_LENGTH = "length";
+    static final String FIELD_NAME_LENGTH = "length";
 
     // TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE
-    public static final String FIELD_NAME_PRECISION = "precision";
-    public static final String FIELD_NAME_TIMESTAMP_KIND = "kind";
+    static final String FIELD_NAME_PRECISION = "precision";
+    static final String FIELD_NAME_TIMESTAMP_KIND = "kind";
 
     // ARRAY, MULTISET
-    public static final String FIELD_NAME_ELEMENT_TYPE = "elementType";
+    static final String FIELD_NAME_ELEMENT_TYPE = "elementType";
 
     // MAP
-    public static final String FIELD_NAME_KEY_TYPE = "keyType";
-    public static final String FIELD_NAME_VALUE_TYPE = "valueType";
+    static final String FIELD_NAME_KEY_TYPE = "keyType";
+    static final String FIELD_NAME_VALUE_TYPE = "valueType";
 
     // ROW
-    public static final String FIELD_NAME_FIELDS = "fields";
-    public static final String FIELD_NAME_FIELD_NAME = "name";
-    public static final String FIELD_NAME_FIELD_TYPE = "fieldType";
-    public static final String FIELD_NAME_FIELD_DESCRIPTION = "description";
+    static final String FIELD_NAME_FIELDS = "fields";
+    static final String FIELD_NAME_FIELD_NAME = "name";
+    static final String FIELD_NAME_FIELD_TYPE = "fieldType";
+    static final String FIELD_NAME_FIELD_DESCRIPTION = "description";
 
     // DISTINCT_TYPE
-    public static final String FIELD_NAME_SOURCE_TYPE = "sourceType";
+    static final String FIELD_NAME_SOURCE_TYPE = "sourceType";
 
     // STRUCTURED_TYPE
-    public static final String FIELD_NAME_OBJECT_IDENTIFIER = "objectIdentifier";
-    public static final String FIELD_NAME_IMPLEMENTATION_CLASS = "implementationClass";
-    public static final String FIELD_NAME_ATTRIBUTES = "attributes";
-    public static final String FIELD_NAME_ATTRIBUTE_NAME = "name";
-    public static final String FIELD_NAME_ATTRIBUTE_TYPE = "attributeType";
-    public static final String FIELD_NAME_ATTRIBUTE_DESCRIPTION = "description";
-    public static final String FIELD_NAME_FINAL = "final";
-    public static final String FIELD_NAME_INSTANTIABLE = "instantiable";
-    public static final String FIELD_NAME_COMPARISON = "comparison";
-    public static final String FIELD_NAME_SUPER_TYPE = "superType";
+    static final String FIELD_NAME_OBJECT_IDENTIFIER = "objectIdentifier";
+    static final String FIELD_NAME_IMPLEMENTATION_CLASS = "implementationClass";
+    static final String FIELD_NAME_ATTRIBUTES = "attributes";
+    static final String FIELD_NAME_ATTRIBUTE_NAME = "name";
+    static final String FIELD_NAME_ATTRIBUTE_TYPE = "attributeType";
+    static final String FIELD_NAME_ATTRIBUTE_DESCRIPTION = "description";
+    static final String FIELD_NAME_FINAL = "final";
+    static final String FIELD_NAME_INSTANTIABLE = "instantiable";
+    static final String FIELD_NAME_COMPARISON = "comparison";
+    static final String FIELD_NAME_SUPER_TYPE = "superType";
 
     // RAW
-    public static final String FIELD_NAME_CLASS = "class";
-    public static final String FIELD_NAME_EXTERNAL_DATA_TYPE = "externalDataType";
-    public static final String FIELD_NAME_SPECIAL_SERIALIZER = "specialSerializer";
-    public static final String FIELD_VALUE_EXTERNAL_SERIALIZER_NULL = "NULL";
+    static final String FIELD_NAME_CLASS = "class";
+    static final String FIELD_NAME_EXTERNAL_DATA_TYPE = "externalDataType";
+    static final String FIELD_NAME_SPECIAL_SERIALIZER = "specialSerializer";
+    static final String FIELD_VALUE_EXTERNAL_SERIALIZER_NULL = "NULL";
 
-    public LogicalTypeJsonSerializer() {
+    LogicalTypeJsonSerializer() {
         super(LogicalType.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java
index d6d9151..3a0282b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
@@ -54,13 +55,15 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindow
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer.KIND_TUMBLING;
 
 /**
- * JSON deserializer for {@link LogicalWindow}, refer to {@link LogicalWindowJsonSerializer} for
- * serializer.
+ * JSON deserializer for {@link LogicalWindow}.
+ *
+ * @see LogicalWindowJsonSerializer for the reverse operation
  */
-public class LogicalWindowJsonDeserializer extends StdDeserializer<LogicalWindow> {
+@Internal
+final class LogicalWindowJsonDeserializer extends StdDeserializer<LogicalWindow> {
     private static final long serialVersionUID = 1L;
 
-    public LogicalWindowJsonDeserializer() {
+    LogicalWindowJsonDeserializer() {
         super(LogicalWindow.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonSerializer.java
index 2195b5f..518e7d2 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
@@ -39,31 +40,33 @@ import static org.apache.flink.table.planner.plan.utils.AggregateUtil.toDuration
 import static org.apache.flink.table.planner.plan.utils.AggregateUtil.toLong;
 
 /**
- * JSON serializer for {@link LogicalWindow}, refer to {@link LogicalWindowJsonDeserializer} for
- * deserializer.
+ * JSON serializer for {@link LogicalWindow}.
+ *
+ * @see LogicalTypeJsonDeserializer for the reverse operation
  */
-public class LogicalWindowJsonSerializer extends StdSerializer<LogicalWindow> {
+@Internal
+final class LogicalWindowJsonSerializer extends StdSerializer<LogicalWindow> {
     private static final long serialVersionUID = 1L;
 
-    public static final String FIELD_NAME_KIND = "kind";
-    public static final String KIND_TUMBLING = "TUMBLING";
-    public static final String KIND_SLIDING = "SLIDING";
-    public static final String KIND_SESSION = "SESSION";
+    static final String FIELD_NAME_KIND = "kind";
+    static final String KIND_TUMBLING = "TUMBLING";
+    static final String KIND_SLIDING = "SLIDING";
+    static final String KIND_SESSION = "SESSION";
 
-    public static final String FIELD_NAME_ALIAS = "alias";
-    public static final String FIELD_NAME_TIME_FIELD = "timeField";
-    public static final String FIELD_NAME_FIELD_NAME = "fieldName";
-    public static final String FIELD_NAME_FIELD_INDEX = "fieldIndex";
-    public static final String FIELD_NAME_INPUT_INDEX = "inputIndex";
-    public static final String FIELD_NAME_FIELD_TYPE = "fieldType";
+    static final String FIELD_NAME_ALIAS = "alias";
+    static final String FIELD_NAME_TIME_FIELD = "timeField";
+    static final String FIELD_NAME_FIELD_NAME = "fieldName";
+    static final String FIELD_NAME_FIELD_INDEX = "fieldIndex";
+    static final String FIELD_NAME_INPUT_INDEX = "inputIndex";
+    static final String FIELD_NAME_FIELD_TYPE = "fieldType";
 
-    public static final String FIELD_NAME_SIZE = "size";
-    public static final String FIELD_NAME_IS_TIME_WINDOW = "isTimeWindow";
+    static final String FIELD_NAME_SIZE = "size";
+    static final String FIELD_NAME_IS_TIME_WINDOW = "isTimeWindow";
 
-    public static final String FIELD_NAME_SLIDE = "slide";
-    public static final String FIELD_NAME_GAP = "gap";
+    static final String FIELD_NAME_SLIDE = "slide";
+    static final String FIELD_NAME_GAP = "gap";
 
-    public LogicalWindowJsonSerializer() {
+    LogicalWindowJsonSerializer() {
         super(LogicalWindow.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonDeserializer.java
index 0492d44..9285483 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
@@ -28,11 +29,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std
 
 import java.io.IOException;
 
-/** JSON deserializer for {@link ObjectIdentifier}. */
-public class ObjectIdentifierJsonDeserializer extends StdDeserializer<ObjectIdentifier> {
+/**
+ * JSON deserializer for {@link ObjectIdentifier}.
+ *
+ * @see ObjectIdentifierJsonSerializer for the reverse operation
+ */
+@Internal
+final class ObjectIdentifierJsonDeserializer extends StdDeserializer<ObjectIdentifier> {
     private static final long serialVersionUID = 1L;
 
-    public ObjectIdentifierJsonDeserializer() {
+    ObjectIdentifierJsonDeserializer() {
         super(ObjectIdentifier.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonSerializer.java
index d14e37d..fb23dcd 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -26,11 +27,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 
 import java.io.IOException;
 
-/** JSON serializer for {@link ObjectIdentifier}. */
-public class ObjectIdentifierJsonSerializer extends StdSerializer<ObjectIdentifier> {
+/**
+ * JSON serializer for {@link ObjectIdentifier}.
+ *
+ * @see ObjectIdentifierJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ObjectIdentifierJsonSerializer extends StdSerializer<ObjectIdentifier> {
     private static final long serialVersionUID = 1L;
 
-    public ObjectIdentifierJsonSerializer() {
+    ObjectIdentifierJsonSerializer() {
         super(ObjectIdentifier.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java
index b1c8cd0..467fc9e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java
@@ -38,10 +38,10 @@ import java.io.IOException;
  * @see RelDataTypeJsonSerializer for the reverse operation
  */
 @Internal
-public class RelDataTypeJsonDeserializer extends StdDeserializer<RelDataType> {
+final class RelDataTypeJsonDeserializer extends StdDeserializer<RelDataType> {
     private static final long serialVersionUID = 1L;
 
-    public RelDataTypeJsonDeserializer() {
+    RelDataTypeJsonDeserializer() {
         super(RelDataType.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java
index 5d76abb..da86cc5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java
@@ -37,10 +37,10 @@ import java.io.IOException;
  * @see RelDataTypeJsonDeserializer for the reverse operation.
  */
 @Internal
-public final class RelDataTypeJsonSerializer extends StdSerializer<RelDataType> {
+final class RelDataTypeJsonSerializer extends StdSerializer<RelDataType> {
     private static final long serialVersionUID = 1L;
 
-    public RelDataTypeJsonSerializer() {
+    RelDataTypeJsonSerializer() {
         super(RelDataType.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java
index 24db15a..cbbb7a6 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java
@@ -18,30 +18,35 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.RequiredDistribution;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 
 import java.io.IOException;
 
-/** JSON deserializer for {@link RequiredDistribution}. */
-public class RequiredDistributionJsonDeserializer extends StdDeserializer<RequiredDistribution> {
+/**
+ * JSON deserializer for {@link RequiredDistribution}.
+ *
+ * @see RelDataTypeJsonSerializer for the reverse operation
+ */
+@Internal
+final class RequiredDistributionJsonDeserializer extends StdDeserializer<RequiredDistribution> {
     private static final long serialVersionUID = 1L;
 
-    public RequiredDistributionJsonDeserializer() {
+    RequiredDistributionJsonDeserializer() {
         super(RequiredDistribution.class);
     }
 
     @Override
     public RequiredDistribution deserialize(JsonParser jsonParser, DeserializationContext ctx)
-            throws IOException, JsonProcessingException {
+            throws IOException {
         JsonNode jsonNode = jsonParser.getCodec().readTree(jsonParser);
         DistributionType type =
                 DistributionType.valueOf(jsonNode.get("type").asText().toUpperCase());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java
index df47b80..1817557 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.HashDistribution;
@@ -29,11 +30,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 
 import java.io.IOException;
 
-/** JSON serializer for {@link RequiredDistribution}. */
-public class RequiredDistributionJsonSerializer extends StdSerializer<RequiredDistribution> {
+/**
+ * JSON serializer for {@link RequiredDistribution}.
+ *
+ * @see RequiredDistributionJsonDeserializer for the reverse operation
+ */
+@Internal
+final class RequiredDistributionJsonSerializer extends StdSerializer<RequiredDistribution> {
     private static final long serialVersionUID = 1L;
 
-    public RequiredDistributionJsonSerializer() {
+    RequiredDistributionJsonSerializer() {
         super(RequiredDistribution.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonDeserializer.java
index b19a618..86ce77e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
@@ -40,10 +41,16 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedCatal
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedCatalogTableJsonSerializer.PARTITION_KEYS;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedCatalogTableJsonSerializer.RESOLVED_SCHEMA;
 
-class ResolvedCatalogTableJsonDeserializer extends StdDeserializer<ResolvedCatalogTable> {
+/**
+ * JSON deserializer for {@link ResolvedCatalogTable}.
+ *
+ * @see ResolvedCatalogTableJsonSerializer for the reverse operation
+ */
+@Internal
+final class ResolvedCatalogTableJsonDeserializer extends StdDeserializer<ResolvedCatalogTable> {
     private static final long serialVersionUID = 1L;
 
-    public ResolvedCatalogTableJsonDeserializer() {
+    ResolvedCatalogTableJsonDeserializer() {
         super(ResolvedCatalogTable.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonSerializer.java
index c088f57..25d9c2f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
@@ -31,17 +32,23 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 
 import java.io.IOException;
 
-class ResolvedCatalogTableJsonSerializer extends StdSerializer<ResolvedCatalogTable> {
+/**
+ * JSON serializer for {@link ResolvedCatalogTable}.
+ *
+ * @see ResolvedCatalogTableJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ResolvedCatalogTableJsonSerializer extends StdSerializer<ResolvedCatalogTable> {
     private static final long serialVersionUID = 1L;
 
     static final String SERIALIZE_OPTIONS = "serialize_options";
 
-    public static final String RESOLVED_SCHEMA = "schema";
-    public static final String PARTITION_KEYS = "partitionKeys";
-    public static final String OPTIONS = "options";
-    public static final String COMMENT = "comment";
+    static final String RESOLVED_SCHEMA = "schema";
+    static final String PARTITION_KEYS = "partitionKeys";
+    static final String OPTIONS = "options";
+    static final String COMMENT = "comment";
 
-    public ResolvedCatalogTableJsonSerializer() {
+    ResolvedCatalogTableJsonSerializer() {
         super(ResolvedCatalogTable.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonDeserializer.java
index f513ea9..2d6c1ee 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.expressions.ResolvedExpression;
@@ -41,9 +42,15 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedExpre
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedExpressionJsonSerializer.TYPE;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedExpressionJsonSerializer.TYPE_REX_NODE_EXPRESSION;
 
-class ResolvedExpressionJsonDeserializer extends StdDeserializer<ResolvedExpression> {
+/**
+ * JSON deserializer for {@link ResolvedExpression}.
+ *
+ * @see ResolvedExpressionJsonSerializer for the reverse operation
+ */
+@Internal
+final class ResolvedExpressionJsonDeserializer extends StdDeserializer<ResolvedExpression> {
 
-    protected ResolvedExpressionJsonDeserializer() {
+    ResolvedExpressionJsonDeserializer() {
         super(ResolvedExpression.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonSerializer.java
index 9dfa27d..b1c2009 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.planner.expressions.RexNodeExpression;
@@ -28,14 +29,20 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 
 import java.io.IOException;
 
-class ResolvedExpressionJsonSerializer extends StdSerializer<ResolvedExpression> {
+/**
+ * JSON serializer for {@link ResolvedExpression}.
+ *
+ * @see ResolvedExpressionJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ResolvedExpressionJsonSerializer extends StdSerializer<ResolvedExpression> {
 
-    public static final String TYPE = "type";
-    public static final String TYPE_REX_NODE_EXPRESSION = "rexNodeExpression";
-    public static final String REX_NODE = "rexNode";
-    public static final String SERIALIZABLE_STRING = "serializableString";
+    static final String TYPE = "type";
+    static final String TYPE_REX_NODE_EXPRESSION = "rexNodeExpression";
+    static final String REX_NODE = "rexNode";
+    static final String SERIALIZABLE_STRING = "serializableString";
 
-    protected ResolvedExpressionJsonSerializer() {
+    ResolvedExpressionJsonSerializer() {
         super(ResolvedExpression.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java
index 94e20ad..630ce93 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UniqueConstraint;
@@ -37,10 +38,16 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchem
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchemaJsonSerializer.PRIMARY_KEY;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchemaJsonSerializer.WATERMARK_SPECS;
 
-class ResolvedSchemaJsonDeserializer extends StdDeserializer<ResolvedSchema> {
+/**
+ * JSON deserializer for {@link ResolvedSchema}.
+ *
+ * @see ResolvedSchemaJsonSerializer for the reverse operation
+ */
+@Internal
+final class ResolvedSchemaJsonDeserializer extends StdDeserializer<ResolvedSchema> {
     private static final long serialVersionUID = 1L;
 
-    public ResolvedSchemaJsonDeserializer() {
+    ResolvedSchemaJsonDeserializer() {
         super(ResolvedSchema.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java
index da8bb8b..393d5ab 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.ResolvedSchema;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -28,14 +29,20 @@ import java.io.IOException;
 
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.serializeOptionalField;
 
-class ResolvedSchemaJsonSerializer extends StdSerializer<ResolvedSchema> {
+/**
+ * JSON serializer for {@link ResolvedSchema}.
+ *
+ * @see ResolvedSchemaJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ResolvedSchemaJsonSerializer extends StdSerializer<ResolvedSchema> {
     private static final long serialVersionUID = 1L;
 
-    public static final String COLUMNS = "columns";
-    public static final String WATERMARK_SPECS = "watermarkSpecs";
-    public static final String PRIMARY_KEY = "primaryKey";
+    static final String COLUMNS = "columns";
+    static final String WATERMARK_SPECS = "watermarkSpecs";
+    static final String PRIMARY_KEY = "primaryKey";
 
-    public ResolvedSchemaJsonSerializer() {
+    ResolvedSchemaJsonSerializer() {
         super(ResolvedSchema.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
index ff9e322..8e52495 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
@@ -110,10 +110,10 @@ import static org.apache.flink.table.planner.typeutils.SymbolUtil.serializableTo
  * @see RexNodeJsonSerializer for the reverse operation
  */
 @Internal
-public class RexNodeJsonDeserializer extends StdDeserializer<RexNode> {
+final class RexNodeJsonDeserializer extends StdDeserializer<RexNode> {
     private static final long serialVersionUID = 1L;
 
-    public RexNodeJsonDeserializer() {
+    RexNodeJsonDeserializer() {
         super(RexNode.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
index d38f08a..7e38317 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
@@ -72,53 +72,53 @@ import static org.apache.flink.table.planner.typeutils.SymbolUtil.calciteToSeria
  * @see RexNodeJsonDeserializer for the reverse operation
  */
 @Internal
-public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
+final class RexNodeJsonSerializer extends StdSerializer<RexNode> {
     private static final long serialVersionUID = 1L;
 
     // Common fields
-    public static final String FIELD_NAME_KIND = "kind";
-    public static final String FIELD_NAME_VALUE = "value";
-    public static final String FIELD_NAME_TYPE = "type";
-    public static final String FIELD_NAME_NAME = "name";
+    static final String FIELD_NAME_KIND = "kind";
+    static final String FIELD_NAME_VALUE = "value";
+    static final String FIELD_NAME_TYPE = "type";
+    static final String FIELD_NAME_NAME = "name";
 
     // INPUT_REF
-    public static final String KIND_INPUT_REF = "INPUT_REF";
-    public static final String FIELD_NAME_INPUT_INDEX = "inputIndex";
+    static final String KIND_INPUT_REF = "INPUT_REF";
+    static final String FIELD_NAME_INPUT_INDEX = "inputIndex";
 
     // LITERAL
-    public static final String KIND_LITERAL = "LITERAL";
+    static final String KIND_LITERAL = "LITERAL";
     // Sarg fields and values
-    public static final String FIELD_NAME_SARG = "sarg";
-    public static final String FIELD_NAME_RANGES = "ranges";
-    public static final String FIELD_NAME_BOUND_LOWER = "lower";
-    public static final String FIELD_NAME_BOUND_UPPER = "upper";
-    public static final String FIELD_NAME_BOUND_TYPE = "boundType";
-    public static final String FIELD_NAME_CONTAINS_NULL = "containsNull";
+    static final String FIELD_NAME_SARG = "sarg";
+    static final String FIELD_NAME_RANGES = "ranges";
+    static final String FIELD_NAME_BOUND_LOWER = "lower";
+    static final String FIELD_NAME_BOUND_UPPER = "upper";
+    static final String FIELD_NAME_BOUND_TYPE = "boundType";
+    static final String FIELD_NAME_CONTAINS_NULL = "containsNull";
     // Symbol fields
-    public static final String FIELD_NAME_SYMBOL = "symbol";
+    static final String FIELD_NAME_SYMBOL = "symbol";
 
     // FIELD_ACCESS
-    public static final String KIND_FIELD_ACCESS = "FIELD_ACCESS";
-    public static final String FIELD_NAME_EXPR = "expr";
+    static final String KIND_FIELD_ACCESS = "FIELD_ACCESS";
+    static final String FIELD_NAME_EXPR = "expr";
 
     // CORREL_VARIABLE
-    public static final String KIND_CORREL_VARIABLE = "CORREL_VARIABLE";
-    public static final String FIELD_NAME_CORREL = "correl";
+    static final String KIND_CORREL_VARIABLE = "CORREL_VARIABLE";
+    static final String FIELD_NAME_CORREL = "correl";
 
     // PATTERN_INPUT_REF
-    public static final String KIND_PATTERN_INPUT_REF = "PATTERN_INPUT_REF";
-    public static final String FIELD_NAME_ALPHA = "alpha";
+    static final String KIND_PATTERN_INPUT_REF = "PATTERN_INPUT_REF";
+    static final String FIELD_NAME_ALPHA = "alpha";
 
     // CALL
-    public static final String KIND_CALL = "CALL";
-    public static final String FIELD_NAME_OPERANDS = "operands";
-    public static final String FIELD_NAME_INTERNAL_NAME = "internalName";
-    public static final String FIELD_NAME_SYSTEM_NAME = "systemName";
-    public static final String FIELD_NAME_CATALOG_NAME = "catalogName";
-    public static final String FIELD_NAME_SYNTAX = "syntax";
-    public static final String FIELD_NAME_CLASS = "class";
-
-    public RexNodeJsonSerializer() {
+    static final String KIND_CALL = "CALL";
+    static final String FIELD_NAME_OPERANDS = "operands";
+    static final String FIELD_NAME_INTERNAL_NAME = "internalName";
+    static final String FIELD_NAME_SYSTEM_NAME = "systemName";
+    static final String FIELD_NAME_CATALOG_NAME = "catalogName";
+    static final String FIELD_NAME_SYNTAX = "syntax";
+    static final String FIELD_NAME_CLASS = "class";
+
+    RexNodeJsonSerializer() {
         super(RexNode.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonDeserializer.java
index 9f1b8e3..a663222 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -41,12 +42,14 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexWindowBoun
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexWindowBoundJsonSerializer.KIND_UNBOUNDED_PRECEDING;
 
 /**
- * JSON deserializer for {@link RexWindowBound}. refer to {@link RexWindowBoundJsonSerializer} for
- * serializer.
+ * JSON deserializer for {@link RexWindowBound}.
+ *
+ * @see RexWindowBoundJsonSerializer for the reverse operation
  */
-public class RexWindowBoundJsonDeserializer extends StdDeserializer<RexWindowBound> {
+@Internal
+final class RexWindowBoundJsonDeserializer extends StdDeserializer<RexWindowBound> {
 
-    public RexWindowBoundJsonDeserializer() {
+    RexWindowBoundJsonDeserializer() {
         super(RexWindowBound.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java
index 021731c..658db94 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -29,25 +30,24 @@ import org.apache.calcite.rex.RexWindowBound;
 import java.io.IOException;
 
 /**
- * JSON serializer for {@link RexWindowBound}. refer to {@link RexWindowBoundJsonDeserializer} for
- * deserializer.
+ * JSON serializer for {@link RexWindowBound}.
  *
- * <p>Supports serialize CURRENT_ROW, UNBOUNDED_PRECEDING, UNBOUNDED_FOLLOWING, Preceding Bounded
- * Window and Following Bounded Window.
+ * @see RexWindowBoundJsonDeserializer for the reverse operation
  */
-public class RexWindowBoundJsonSerializer extends StdSerializer<RexWindowBound> {
+@Internal
+final class RexWindowBoundJsonSerializer extends StdSerializer<RexWindowBound> {
 
-    public static final String FIELD_NAME_KIND = "kind";
-    public static final String KIND_CURRENT_ROW = "CURRENT_ROW";
-    public static final String KIND_UNBOUNDED_PRECEDING = "UNBOUNDED_PRECEDING";
-    public static final String KIND_UNBOUNDED_FOLLOWING = "UNBOUNDED_FOLLOWING";
-    public static final String KIND_BOUNDED_WINDOW = "BOUNDED_WINDOW";
+    static final String FIELD_NAME_KIND = "kind";
+    static final String KIND_CURRENT_ROW = "CURRENT_ROW";
+    static final String KIND_UNBOUNDED_PRECEDING = "UNBOUNDED_PRECEDING";
+    static final String KIND_UNBOUNDED_FOLLOWING = "UNBOUNDED_FOLLOWING";
+    static final String KIND_BOUNDED_WINDOW = "BOUNDED_WINDOW";
 
-    public static final String FIELD_NAME_IS_PRECEDING = "isPreceding";
-    public static final String FIELD_NAME_IS_FOLLOWING = "isFollowing";
-    public static final String FIELD_NAME_OFFSET = "offset";
+    static final String FIELD_NAME_IS_PRECEDING = "isPreceding";
+    static final String FIELD_NAME_IS_FOLLOWING = "isFollowing";
+    static final String FIELD_NAME_OFFSET = "offset";
 
-    public RexWindowBoundJsonSerializer() {
+    RexWindowBoundJsonSerializer() {
         super(RexWindowBound.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SerdeContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SerdeContext.java
index daec5ba..1274132 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SerdeContext.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SerdeContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.planner.calcite.FlinkContext;
@@ -33,6 +34,7 @@ import org.apache.calcite.sql.SqlOperatorTable;
 /**
  * A context to allow the store user-defined data within ExecNode serialization and deserialization.
  */
+@Internal
 public class SerdeContext {
     static final String SERDE_CONTEXT_KEY = "serdeCtx";
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonDeserializer.java
index 2a61222..80ff766 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonDeserializer.java
@@ -18,29 +18,34 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.Shuffle;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 
 import java.io.IOException;
 
-/** JSON deserializer for {@link Shuffle}. */
-public class ShuffleJsonDeserializer extends StdDeserializer<Shuffle> {
+/**
+ * JSON deserializer for {@link Shuffle}.
+ *
+ * @see ShuffleJsonSerializer for the reverse operation
+ */
+@Internal
+final class ShuffleJsonDeserializer extends StdDeserializer<Shuffle> {
     private static final long serialVersionUID = 1L;
 
-    public ShuffleJsonDeserializer() {
+    ShuffleJsonDeserializer() {
         super(Shuffle.class);
     }
 
     @Override
     public Shuffle deserialize(JsonParser jsonParser, DeserializationContext ctx)
-            throws IOException, JsonProcessingException {
+            throws IOException {
         JsonNode jsonNode = jsonParser.getCodec().readTree(jsonParser);
         Shuffle.Type type = Shuffle.Type.valueOf(jsonNode.get("type").asText().toUpperCase());
         switch (type) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonSerializer.java
index 1607df4..5603186 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.HashShuffle;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.Shuffle;
@@ -28,11 +29,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 
 import java.io.IOException;
 
-/** JSON serializer for {@link Shuffle}. */
-public class ShuffleJsonSerializer extends StdSerializer<Shuffle> {
+/**
+ * JSON serializer for {@link Shuffle}.
+ *
+ * @see ShuffleJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ShuffleJsonSerializer extends StdSerializer<Shuffle> {
     private static final long serialVersionUID = 1L;
 
-    public ShuffleJsonSerializer() {
+    ShuffleJsonSerializer() {
         super(Shuffle.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
index 78a87829..6250157 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.Constraint.ConstraintType;
 import org.apache.flink.table.catalog.UniqueConstraint;
 
@@ -28,12 +29,13 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import java.util.List;
 
 /** Mixin for {@link UniqueConstraint}. */
+@Internal
 abstract class UniqueConstraintMixin {
 
-    public static final String NAME = "name";
-    public static final String ENFORCED = "enforced";
-    public static final String TYPE = "type";
-    public static final String COLUMNS = "columns";
+    static final String NAME = "name";
+    static final String ENFORCED = "enforced";
+    static final String TYPE = "type";
+    static final String COLUMNS = "columns";
 
     @JsonCreator
     private UniqueConstraintMixin(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WatermarkSpecMixin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WatermarkSpecMixin.java
index 26fa1c9..d914c48 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WatermarkSpecMixin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WatermarkSpecMixin.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.WatermarkSpec;
 import org.apache.flink.table.expressions.ResolvedExpression;
 
@@ -25,10 +26,11 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 /** Mixin for {@link WatermarkSpec}. */
+@Internal
 abstract class WatermarkSpecMixin {
 
-    public static final String ROWTIME_ATTRIBUTE = "rowtimeAttribute";
-    public static final String EXPRESSION = "expression";
+    static final String ROWTIME_ATTRIBUTE = "rowtimeAttribute";
+    static final String EXPRESSION = "expression";
 
     @JsonCreator
     private WatermarkSpecMixin(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonDeserializer.java
index 2a03d5c..f4abacb 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonDeserializer.java
@@ -40,10 +40,10 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.WindowReferen
  */
 @Deprecated
 @Internal
-public final class WindowReferenceJsonDeserializer extends StdDeserializer<WindowReference> {
+final class WindowReferenceJsonDeserializer extends StdDeserializer<WindowReference> {
     private static final long serialVersionUID = 1L;
 
-    protected WindowReferenceJsonDeserializer() {
+    WindowReferenceJsonDeserializer() {
         super(WindowReference.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonSerializer.java
index 179a4cd..e1f296c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonSerializer.java
@@ -35,13 +35,13 @@ import java.io.IOException;
  */
 @Deprecated
 @Internal
-public final class WindowReferenceJsonSerializer extends StdSerializer<WindowReference> {
+final class WindowReferenceJsonSerializer extends StdSerializer<WindowReference> {
     private static final long serialVersionUID = 1L;
 
-    public static final String FIELD_NAME_NAME = "name";
-    public static final String FIELD_NAME_TYPE = "type";
+    static final String FIELD_NAME_NAME = "name";
+    static final String FIELD_NAME_TYPE = "type";
 
-    protected WindowReferenceJsonSerializer() {
+    WindowReferenceJsonSerializer() {
         super(WindowReference.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/OverSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/OverSpec.java
index 63d1fdc..c7ea182 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/OverSpec.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/OverSpec.java
@@ -18,15 +18,10 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.spec;
 
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RexWindowBoundJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RexWindowBoundJsonSerializer;
-
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rex.RexLiteral;
@@ -129,14 +124,10 @@ public class OverSpec {
 
         /** The lower bound of the window. */
         @JsonProperty(FIELD_NAME_LOWER_BOUND)
-        @JsonSerialize(using = RexWindowBoundJsonSerializer.class)
-        @JsonDeserialize(using = RexWindowBoundJsonDeserializer.class)
         private final RexWindowBound lowerBound;
 
         /** The upper bound of the window. */
         @JsonProperty(FIELD_NAME_UPPER_BOUND)
-        @JsonSerialize(using = RexWindowBoundJsonSerializer.class)
-        @JsonDeserialize(using = RexWindowBoundJsonDeserializer.class)
         private final RexWindowBound upperBound;
 
         /** The agg functions set. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
index 7b31d27..9e68aa5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
@@ -35,8 +35,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
 import org.apache.flink.table.planner.plan.utils.AggregateUtil;
@@ -56,8 +54,6 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.tools.RelBuilder;
@@ -108,8 +104,6 @@ public class StreamExecGlobalGroupAggregate extends StreamExecAggregateBase {
 
     /** The input row type of this node's local agg. */
     @JsonProperty(FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE)
-    @JsonSerialize(using = LogicalTypeJsonSerializer.class)
-    @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
     private final RowType localAggInputRowType;
 
     /** Whether this node will generate UPDATE_BEFORE messages. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
index b026708..d3c2710 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
@@ -34,8 +34,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
 import org.apache.flink.table.planner.plan.utils.AggregateUtil;
@@ -57,8 +55,6 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.tools.RelBuilder;
@@ -99,8 +95,6 @@ public class StreamExecGlobalWindowAggregate extends StreamExecWindowAggregateBa
 
     /** The input row type of this node's local agg. */
     @JsonProperty(FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE)
-    @JsonSerialize(using = LogicalTypeJsonSerializer.class)
-    @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
     private final RowType localAggInputRowType;
 
     public StreamExecGlobalWindowAggregate(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
index be4cd76..0e33097 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
@@ -41,8 +41,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
@@ -68,8 +66,6 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.tools.RelBuilder;
@@ -133,8 +129,6 @@ public class StreamExecGroupWindowAggregate extends StreamExecAggregateBase {
     private final AggregateCall[] aggCalls;
 
     @JsonProperty(FIELD_NAME_WINDOW)
-    @JsonSerialize(using = LogicalWindowJsonSerializer.class)
-    @JsonDeserialize(using = LogicalWindowJsonDeserializer.class)
     private final LogicalWindow window;
 
     @JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
index e6e354d..9322899 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
@@ -32,8 +32,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
 import org.apache.flink.table.planner.plan.utils.AggregateUtil;
@@ -49,8 +47,6 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.tools.RelBuilder;
@@ -107,8 +103,6 @@ public class StreamExecIncrementalGroupAggregate extends StreamExecAggregateBase
 
     /** The input row type of this node's partial local agg. */
     @JsonProperty(FIELD_NAME_PARTIAL_LOCAL_AGG_INPUT_TYPE)
-    @JsonSerialize(using = LogicalTypeJsonSerializer.class)
-    @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
     private final RowType partialLocalAggInputType;
 
     /** Whether this node consumes retraction messages. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
index 0577a51..2334159 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
@@ -33,8 +33,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.ChangelogModeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.ChangelogModeJsonSerializer;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
 import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -43,8 +41,6 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -80,8 +76,6 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj
     public static final String FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE = "requireUpsertMaterialize";
 
     @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODE)
-    @JsonSerialize(using = ChangelogModeJsonSerializer.class)
-    @JsonDeserialize(using = ChangelogModeJsonDeserializer.class)
     private final ChangelogMode inputChangelogMode;
 
     @JsonProperty(FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
index 2b973dc6..0353338 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
@@ -24,8 +24,6 @@ import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
 import org.apache.flink.table.functions.UserDefinedFunction;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer;
 import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
 import org.apache.flink.table.planner.plan.schema.TableSourceTable;
 import org.apache.flink.table.runtime.connector.source.LookupRuntimeProviderContext;
@@ -38,8 +36,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rex.RexLiteral;
@@ -77,8 +73,6 @@ public final class LookupJoinUtil {
         public final LogicalType sourceType;
 
         @JsonProperty(FIELD_NAME_LITERAL)
-        @JsonSerialize(using = RexNodeJsonSerializer.class)
-        @JsonDeserialize(using = RexNodeJsonDeserializer.class)
         public final RexLiteral literal;
 
         @JsonCreator