You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/02/25 08:51:19 UTC

[flink] 02/02: [hotfix][table-planner] Clean up serde classes

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3d8748316f597e6936442dc514f266ffbcce4bed
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Feb 24 14:21:25 2022 +0100

    [hotfix][table-planner] Clean up serde classes
---
 .../plan/abilities/sink/WritingMetadataSpec.java   |  6 ---
 .../abilities/source/SourceAbilitySpecBase.java    |  6 ---
 .../planner/plan/logical/WindowingStrategy.java    |  6 ---
 .../table/planner/plan/nodes/exec/ExecNode.java    |  6 ---
 .../planner/plan/nodes/exec/InputProperty.java     |  6 ---
 .../plan/nodes/exec/common/CommonExecValues.java   | 10 +---
 .../exec/serde/AggregateCallJsonDeserializer.java  | 12 +++--
 .../exec/serde/AggregateCallJsonSerializer.java    | 18 +++----
 .../exec/serde/ChangelogModeJsonDeserializer.java  | 14 ++---
 .../exec/serde/ChangelogModeJsonSerializer.java    | 11 ++--
 .../nodes/exec/serde/ColumnJsonDeserializer.java   | 11 +++-
 .../nodes/exec/serde/ColumnJsonSerializer.java     | 31 ++++++-----
 .../ContextResolvedTableJsonDeserializer.java      | 11 +++-
 .../serde/ContextResolvedTableJsonSerializer.java  | 16 ++++--
 .../nodes/exec/serde/DataTypeJsonDeserializer.java |  4 +-
 .../nodes/exec/serde/DataTypeJsonSerializer.java   | 20 ++++----
 .../exec/serde/ExecNodeGraphJsonDeserializer.java  | 13 +++--
 .../exec/serde/ExecNodeGraphJsonSerializer.java    | 11 ++--
 .../exec/serde/FlinkVersionJsonDeserializer.java   | 12 +++--
 .../exec/serde/FlinkVersionJsonSerializer.java     | 12 +++--
 .../plan/nodes/exec/serde/JsonPlanEdge.java        | 24 +++++----
 .../plan/nodes/exec/serde/JsonPlanGraph.java       | 16 +++---
 .../plan/nodes/exec/serde/JsonSerdeUtil.java       | 24 ++++-----
 .../exec/serde/LogicalTypeJsonDeserializer.java    |  4 +-
 .../exec/serde/LogicalTypeJsonSerializer.java      | 60 +++++++++++-----------
 .../exec/serde/LogicalWindowJsonDeserializer.java  | 11 ++--
 .../exec/serde/LogicalWindowJsonSerializer.java    | 39 +++++++-------
 .../serde/ObjectIdentifierJsonDeserializer.java    | 12 +++--
 .../exec/serde/ObjectIdentifierJsonSerializer.java | 12 +++--
 .../exec/serde/RelDataTypeJsonDeserializer.java    |  4 +-
 .../exec/serde/RelDataTypeJsonSerializer.java      |  4 +-
 .../RequiredDistributionJsonDeserializer.java      | 15 ++++--
 .../serde/RequiredDistributionJsonSerializer.java  | 12 +++--
 .../ResolvedCatalogTableJsonDeserializer.java      | 11 +++-
 .../serde/ResolvedCatalogTableJsonSerializer.java  | 19 ++++---
 .../serde/ResolvedExpressionJsonDeserializer.java  | 11 +++-
 .../serde/ResolvedExpressionJsonSerializer.java    | 19 ++++---
 .../exec/serde/ResolvedSchemaJsonDeserializer.java | 11 +++-
 .../exec/serde/ResolvedSchemaJsonSerializer.java   | 17 ++++--
 .../nodes/exec/serde/RexNodeJsonDeserializer.java  |  4 +-
 .../nodes/exec/serde/RexNodeJsonSerializer.java    | 60 +++++++++++-----------
 .../exec/serde/RexWindowBoundJsonDeserializer.java | 11 ++--
 .../exec/serde/RexWindowBoundJsonSerializer.java   | 28 +++++-----
 .../plan/nodes/exec/serde/SerdeContext.java        |  2 +
 .../nodes/exec/serde/ShuffleJsonDeserializer.java  | 15 ++++--
 .../nodes/exec/serde/ShuffleJsonSerializer.java    | 12 +++--
 .../nodes/exec/serde/UniqueConstraintMixin.java    | 10 ++--
 .../plan/nodes/exec/serde/WatermarkSpecMixin.java  |  6 ++-
 .../serde/WindowReferenceJsonDeserializer.java     |  4 +-
 .../exec/serde/WindowReferenceJsonSerializer.java  |  8 +--
 .../planner/plan/nodes/exec/spec/OverSpec.java     |  9 ----
 .../stream/StreamExecGlobalGroupAggregate.java     |  6 ---
 .../stream/StreamExecGlobalWindowAggregate.java    |  6 ---
 .../stream/StreamExecGroupWindowAggregate.java     |  6 ---
 .../StreamExecIncrementalGroupAggregate.java       |  6 ---
 .../plan/nodes/exec/stream/StreamExecSink.java     |  6 ---
 .../table/planner/plan/utils/LookupJoinUtil.java   |  6 ---
 57 files changed, 416 insertions(+), 350 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/WritingMetadataSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/WritingMetadataSpec.java
index 46a9845..dcdd948 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/WritingMetadataSpec.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/WritingMetadataSpec.java
@@ -21,8 +21,6 @@ package org.apache.flink.table.planner.plan.abilities.sink;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.utils.TypeConversions;
@@ -31,8 +29,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import java.util.List;
 import java.util.Objects;
@@ -52,8 +48,6 @@ public final class WritingMetadataSpec implements SinkAbilitySpec {
     private final List<String> metadataKeys;
 
     @JsonProperty(FIELD_NAME_CONSUMED_TYPE)
-    @JsonSerialize(using = LogicalTypeJsonSerializer.class)
-    @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
     private final LogicalType consumedType;
 
     @JsonCreator
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpecBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpecBase.java
index 8764205..1b5cbf3 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpecBase.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpecBase.java
@@ -18,15 +18,11 @@
 
 package org.apache.flink.table.planner.plan.abilities.source;
 
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import javax.annotation.Nullable;
 
@@ -40,8 +36,6 @@ public abstract class SourceAbilitySpecBase implements SourceAbilitySpec {
     public static final String FIELD_NAME_PRODUCED_TYPE = "producedType";
 
     @JsonProperty(FIELD_NAME_PRODUCED_TYPE)
-    @JsonSerialize(using = LogicalTypeJsonSerializer.class)
-    @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
     private final @Nullable RowType producedType;
 
     public SourceAbilitySpecBase() {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowingStrategy.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowingStrategy.java
index d825419..47bca8d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowingStrategy.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/WindowingStrategy.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.planner.plan.logical;
 
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
 
@@ -27,8 +25,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgn
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 /** Logical representation of a windowing strategy. */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "strategy")
@@ -46,8 +42,6 @@ public abstract class WindowingStrategy {
     protected final WindowSpec window;
 
     @JsonProperty(value = FIELD_NAME_TIME_ATTRIBUTE_TYPE)
-    @JsonSerialize(using = LogicalTypeJsonSerializer.class)
-    @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
     protected final LogicalType timeAttributeType;
 
     @JsonProperty(FIELD_NAME_IS_ROWTIME)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
index 229305f..81d07b8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
@@ -20,8 +20,6 @@ package org.apache.flink.table.planner.plan.nodes.exec;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
 import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
 import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -30,8 +28,6 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver;
 
 import java.util.List;
@@ -75,8 +71,6 @@ public interface ExecNode<T> extends ExecNodeTranslator<T> {
      * data structures.
      */
     @JsonProperty(value = FIELD_NAME_OUTPUT_TYPE)
-    @JsonSerialize(using = LogicalTypeJsonSerializer.class)
-    @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
     LogicalType getOutputType();
 
     /**
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/InputProperty.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/InputProperty.java
index d4ecbcf..a4c8205 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/InputProperty.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/InputProperty.java
@@ -20,14 +20,10 @@ package org.apache.flink.table.planner.plan.nodes.exec;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.operators.Input;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RequiredDistributionJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RequiredDistributionJsonSerializer;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import java.util.Arrays;
 import java.util.Objects;
@@ -82,8 +78,6 @@ public class InputProperty {
      * corresponding input.
      */
     @JsonProperty(FIELD_NAME_REQUIRED_DISTRIBUTION)
-    @JsonSerialize(using = RequiredDistributionJsonSerializer.class)
-    @JsonDeserialize(using = RequiredDistributionJsonDeserializer.class)
     private final RequiredDistribution requiredDistribution;
 
     /** How does the input record trigger the output behavior of the target {@link ExecNode}. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecValues.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecValues.java
index 5a47186..21be27f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecValues.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecValues.java
@@ -27,14 +27,12 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer;
 import org.apache.flink.table.runtime.operators.values.ValuesInputFormat;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
 
 import java.util.Collections;
 import java.util.List;
@@ -78,12 +76,8 @@ public abstract class CommonExecValues extends ExecNodeBase<RowData>
         return transformation;
     }
 
-    /**
-     * In order to use {@link RexNodeJsonSerializer} to serialize {@link RexLiteral}, so we force
-     * cast element of tuples to {@link RexNode} which is the parent class of {@link RexLiteral}.
-     */
     @JsonProperty(value = FIELD_NAME_TUPLES)
-    public List<List<RexNode>> getTuples() {
-        return (List<List<RexNode>>) (Object) tuples;
+    public List<List<RexLiteral>> getTuples() {
+        return tuples;
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java
index 28eef53..6697534 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonDeserializer.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
+
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -41,13 +43,15 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCall
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.AggregateCallJsonSerializer.FIELD_NAME_TYPE;
 
 /**
- * JSON deserializer for {@link AggregateCall}. refer to {@link AggregateCallJsonSerializer} for
- * serializer.
+ * JSON deserializer for {@link AggregateCall}.
+ *
+ * @see AggregateCallJsonSerializer for the reverse operation
  */
-public class AggregateCallJsonDeserializer extends StdDeserializer<AggregateCall> {
+@Internal
+final class AggregateCallJsonDeserializer extends StdDeserializer<AggregateCall> {
     private static final long serialVersionUID = 1L;
 
-    public AggregateCallJsonDeserializer() {
+    AggregateCallJsonDeserializer() {
         super(AggregateCall.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
index 35e3ac9..345400f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
@@ -37,18 +37,18 @@ import java.io.IOException;
  * @see AggregateCallJsonDeserializer for the reverse operation
  */
 @Internal
-public class AggregateCallJsonSerializer extends StdSerializer<AggregateCall> {
+final class AggregateCallJsonSerializer extends StdSerializer<AggregateCall> {
     private static final long serialVersionUID = 1L;
 
-    public static final String FIELD_NAME_NAME = "name";
-    public static final String FIELD_NAME_ARG_LIST = "argList";
-    public static final String FIELD_NAME_FILTER_ARG = "filterArg";
-    public static final String FIELD_NAME_DISTINCT = "distinct";
-    public static final String FIELD_NAME_APPROXIMATE = "approximate";
-    public static final String FIELD_NAME_IGNORE_NULLS = "ignoreNulls";
-    public static final String FIELD_NAME_TYPE = "type";
+    static final String FIELD_NAME_NAME = "name";
+    static final String FIELD_NAME_ARG_LIST = "argList";
+    static final String FIELD_NAME_FILTER_ARG = "filterArg";
+    static final String FIELD_NAME_DISTINCT = "distinct";
+    static final String FIELD_NAME_APPROXIMATE = "approximate";
+    static final String FIELD_NAME_IGNORE_NULLS = "ignoreNulls";
+    static final String FIELD_NAME_TYPE = "type";
 
-    public AggregateCallJsonSerializer() {
+    AggregateCallJsonSerializer() {
         super(AggregateCall.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java
index b3c7351..e6590a7 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonDeserializer.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.types.RowKind;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
@@ -30,20 +30,22 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std
 import java.io.IOException;
 
 /**
- * JSON deserializer for {@link ChangelogMode}. refer to {@link ChangelogModeJsonSerializer} for
- * serializer.
+ * JSON deserializer for {@link ChangelogMode}.
+ *
+ * @see ChangelogModeJsonSerializer for the reverse operation
  */
-public class ChangelogModeJsonDeserializer extends StdDeserializer<ChangelogMode> {
+@Internal
+final class ChangelogModeJsonDeserializer extends StdDeserializer<ChangelogMode> {
     private static final long serialVersionUID = 1L;
 
-    public ChangelogModeJsonDeserializer() {
+    ChangelogModeJsonDeserializer() {
         super(ChangelogMode.class);
     }
 
     @Override
     public ChangelogMode deserialize(
             JsonParser jsonParser, DeserializationContext deserializationContext)
-            throws IOException, JsonProcessingException {
+            throws IOException {
         ChangelogMode.Builder builder = ChangelogMode.newBuilder();
         JsonNode rowKindsNode = jsonParser.readValueAsTree();
         for (JsonNode rowKindNode : rowKindsNode) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java
index dbd5a4a..8d0c462 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ChangelogModeJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.types.RowKind;
 
@@ -28,13 +29,15 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 import java.io.IOException;
 
 /**
- * JSON serializer for {@link ChangelogMode}. refer to {@link ChangelogModeJsonDeserializer} for
- * deserializer.
+ * JSON serializer for {@link ChangelogMode}.
+ *
+ * @see ChangelogModeJsonDeserializer for the reverse operation
  */
-public class ChangelogModeJsonSerializer extends StdSerializer<ChangelogMode> {
+@Internal
+final class ChangelogModeJsonSerializer extends StdSerializer<ChangelogMode> {
     private static final long serialVersionUID = 1L;
 
-    public ChangelogModeJsonSerializer() {
+    ChangelogModeJsonSerializer() {
         super(ChangelogMode.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonDeserializer.java
index 15ba34a..6cbce5d3 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.expressions.ResolvedExpression;
@@ -47,12 +48,18 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSer
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.deserializeOptionalField;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.traverse;
 
-class ColumnJsonDeserializer extends StdDeserializer<Column> {
+/**
+ * JSON deserializer for {@link Column}.
+ *
+ * @see ColumnJsonSerializer for the reverse operation
+ */
+@Internal
+final class ColumnJsonDeserializer extends StdDeserializer<Column> {
 
     private static final String SUPPORTED_KINDS =
             Arrays.toString(new String[] {KIND_PHYSICAL, KIND_COMPUTED, KIND_METADATA});
 
-    public ColumnJsonDeserializer() {
+    ColumnJsonDeserializer() {
         super(Column.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonSerializer.java
index 3d6ded0..4721512 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.Column;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -28,20 +29,26 @@ import java.io.IOException;
 
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.serializeOptionalField;
 
-class ColumnJsonSerializer extends StdSerializer<Column> {
+/**
+ * JSON serializer for {@link Column}.
+ *
+ * @see ColumnJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ColumnJsonSerializer extends StdSerializer<Column> {
 
-    public static final String KIND = "kind";
-    public static final String KIND_PHYSICAL = "PHYSICAL";
-    public static final String KIND_COMPUTED = "COMPUTED";
-    public static final String KIND_METADATA = "METADATA";
-    public static final String NAME = "name";
-    public static final String DATA_TYPE = "dataType";
-    public static final String COMMENT = "comment";
-    public static final String EXPRESSION = "expression";
-    public static final String METADATA_KEY = "metadataKey";
-    public static final String IS_VIRTUAL = "isVirtual";
+    static final String KIND = "kind";
+    static final String KIND_PHYSICAL = "PHYSICAL";
+    static final String KIND_COMPUTED = "COMPUTED";
+    static final String KIND_METADATA = "METADATA";
+    static final String NAME = "name";
+    static final String DATA_TYPE = "dataType";
+    static final String COMMENT = "comment";
+    static final String EXPRESSION = "expression";
+    static final String METADATA_KEY = "metadataKey";
+    static final String IS_VIRTUAL = "isVirtual";
 
-    public ColumnJsonSerializer() {
+    ColumnJsonSerializer() {
         super(Column.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java
index c76091d..0828840 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
 import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore;
@@ -45,10 +46,16 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.ContextResolv
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.ContextResolvedTableJsonSerializer.FIELD_NAME_IDENTIFIER;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedCatalogTableJsonSerializer.OPTIONS;
 
-class ContextResolvedTableJsonDeserializer extends StdDeserializer<ContextResolvedTable> {
+/**
+ * JSON deserializer for {@link ContextResolvedTable}.
+ *
+ * @see ContextResolvedTableJsonSerializer for the reverse operation
+ */
+@Internal
+final class ContextResolvedTableJsonDeserializer extends StdDeserializer<ContextResolvedTable> {
     private static final long serialVersionUID = 1L;
 
-    public ContextResolvedTableJsonDeserializer() {
+    ContextResolvedTableJsonDeserializer() {
         super(ContextResolvedTable.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonSerializer.java
index 0b5d98e..5101198 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
@@ -30,14 +31,19 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 
 import java.io.IOException;
 
-/** JSON serializer for {@link ContextResolvedTable}. */
-class ContextResolvedTableJsonSerializer extends StdSerializer<ContextResolvedTable> {
+/**
+ * JSON serializer for {@link ContextResolvedTable}.
+ *
+ * @see ContextResolvedTableJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ContextResolvedTableJsonSerializer extends StdSerializer<ContextResolvedTable> {
     private static final long serialVersionUID = 1L;
 
-    public static final String FIELD_NAME_IDENTIFIER = "identifier";
-    public static final String FIELD_NAME_CATALOG_TABLE = "resolvedTable";
+    static final String FIELD_NAME_IDENTIFIER = "identifier";
+    static final String FIELD_NAME_CATALOG_TABLE = "resolvedTable";
 
-    public ContextResolvedTableJsonSerializer() {
+    ContextResolvedTableJsonSerializer() {
         super(ContextResolvedTable.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonDeserializer.java
index 3a234b2..21f456e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonDeserializer.java
@@ -59,9 +59,9 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil
  * @see DataTypeJsonSerializer for the reverse operation
  */
 @Internal
-public class DataTypeJsonDeserializer extends StdDeserializer<DataType> {
+final class DataTypeJsonDeserializer extends StdDeserializer<DataType> {
 
-    public DataTypeJsonDeserializer() {
+    DataTypeJsonDeserializer() {
         super(DataType.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java
index 53938a8..8593493 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java
@@ -41,7 +41,7 @@ import static org.apache.flink.table.types.utils.DataTypeUtils.isInternal;
  * @see DataTypeJsonDeserializer for the reverse operation
  */
 @Internal
-public final class DataTypeJsonSerializer extends StdSerializer<DataType> {
+final class DataTypeJsonSerializer extends StdSerializer<DataType> {
     private static final long serialVersionUID = 1L;
 
     /*
@@ -83,22 +83,22 @@ public final class DataTypeJsonSerializer extends StdSerializer<DataType> {
      */
 
     // Common fields
-    public static final String FIELD_NAME_TYPE = "logicalType";
-    public static final String FIELD_NAME_CONVERSION_CLASS = "conversionClass";
+    static final String FIELD_NAME_TYPE = "logicalType";
+    static final String FIELD_NAME_CONVERSION_CLASS = "conversionClass";
 
     // ARRAY, MULTISET
-    public static final String FIELD_NAME_ELEMENT_CLASS = "elementClass";
+    static final String FIELD_NAME_ELEMENT_CLASS = "elementClass";
 
     // MAP
-    public static final String FIELD_NAME_KEY_CLASS = "keyClass";
-    public static final String FIELD_NAME_VALUE_CLASS = "valueClass";
+    static final String FIELD_NAME_KEY_CLASS = "keyClass";
+    static final String FIELD_NAME_VALUE_CLASS = "valueClass";
 
     // ROW, STRUCTURED_TYPE, DISTINCT_TYPE
-    public static final String FIELD_NAME_FIELDS = "fields";
-    public static final String FIELD_NAME_FIELD_NAME = "name";
-    public static final String FIELD_NAME_FIELD_CLASS = "fieldClass";
+    static final String FIELD_NAME_FIELDS = "fields";
+    static final String FIELD_NAME_FIELD_NAME = "name";
+    static final String FIELD_NAME_FIELD_CLASS = "fieldClass";
 
-    public DataTypeJsonSerializer() {
+    DataTypeJsonSerializer() {
         super(DataType.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonDeserializer.java
index 4fef3bb..f353012 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -27,13 +28,17 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std
 import java.io.IOException;
 
 /**
- * JSON deserializer for {@link ExecNodeGraph}. It uses the intermediate representation {@link
- * JsonPlanGraph}.
+ * JSON deserializer for {@link ExecNodeGraph}.
+ *
+ * <p>It uses the intermediate representation {@link JsonPlanGraph}.
+ *
+ * @see ExecNodeGraphJsonSerializer for the reverse operation
  */
-public class ExecNodeGraphJsonDeserializer extends StdDeserializer<ExecNodeGraph> {
+@Internal
+final class ExecNodeGraphJsonDeserializer extends StdDeserializer<ExecNodeGraph> {
     private static final long serialVersionUID = 1L;
 
-    public ExecNodeGraphJsonDeserializer() {
+    ExecNodeGraphJsonDeserializer() {
         super(ExecNodeGraph.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonSerializer.java
index c424f91..752f482 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphValidator;
 import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
@@ -29,13 +30,15 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 import java.io.IOException;
 
 /**
- * JSON serializer for {@link ExecNodeGraph}. It uses the intermediate representation {@link
- * JsonPlanGraph}.
+ * JSON serializer for {@link ExecNodeGraph}.
+ *
+ * @see ExecNodeGraphJsonDeserializer for the reverse operation
  */
-public class ExecNodeGraphJsonSerializer extends StdSerializer<ExecNodeGraph> {
+@Internal
+final class ExecNodeGraphJsonSerializer extends StdSerializer<ExecNodeGraph> {
     private static final long serialVersionUID = 1L;
 
-    public ExecNodeGraphJsonSerializer() {
+    ExecNodeGraphJsonSerializer() {
         super(ExecNodeGraph.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonDeserializer.java
index 968b93c..7f2fb0a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonDeserializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -27,11 +28,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std
 
 import java.io.IOException;
 
-/** JSON deserializer for {@link FlinkVersion}. */
-public class FlinkVersionJsonDeserializer extends StdDeserializer<FlinkVersion> {
+/**
+ * JSON deserializer for {@link FlinkVersion}.
+ *
+ * @see FlinkVersionJsonSerializer for the reverse operation
+ */
+@Internal
+final class FlinkVersionJsonDeserializer extends StdDeserializer<FlinkVersion> {
     private static final long serialVersionUID = 1L;
 
-    public FlinkVersionJsonDeserializer() {
+    FlinkVersionJsonDeserializer() {
         super(FlinkVersion.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonSerializer.java
index 3420314..b43c578 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/FlinkVersionJsonSerializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.Internal;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
@@ -26,11 +27,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 
 import java.io.IOException;
 
-/** JSON serializer for {@link FlinkVersion}. */
-public class FlinkVersionJsonSerializer extends StdSerializer<FlinkVersion> {
+/**
+ * JSON serializer for {@link FlinkVersion}.
+ *
+ * @see FlinkVersionJsonDeserializer for the reverse operation
+ */
+@Internal
+final class FlinkVersionJsonSerializer extends StdSerializer<FlinkVersion> {
     private static final long serialVersionUID = 1L;
 
-    public FlinkVersionJsonSerializer() {
+    FlinkVersionJsonSerializer() {
         super(FlinkVersion.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanEdge.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanEdge.java
index 17c5c5e..f5401b6 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanEdge.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanEdge.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
@@ -36,12 +37,13 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotatio
  *
  * <p>This model is used only during serialization/deserialization.
  */
+@Internal
 @JsonIgnoreProperties(ignoreUnknown = true)
-class JsonPlanEdge {
-    public static final String FIELD_NAME_SOURCE = "source";
-    public static final String FIELD_NAME_TARGET = "target";
-    public static final String FIELD_NAME_SHUFFLE = "shuffle";
-    public static final String FIELD_NAME_SHUFFLE_MODE = "shuffleMode";
+final class JsonPlanEdge {
+    static final String FIELD_NAME_SOURCE = "source";
+    static final String FIELD_NAME_TARGET = "target";
+    static final String FIELD_NAME_SHUFFLE = "shuffle";
+    static final String FIELD_NAME_SHUFFLE_MODE = "shuffleMode";
 
     /** The source node id of this edge. */
     @JsonProperty(FIELD_NAME_SOURCE)
@@ -59,7 +61,7 @@ class JsonPlanEdge {
     private final StreamExchangeMode exchangeMode;
 
     @JsonCreator
-    public JsonPlanEdge(
+    JsonPlanEdge(
             @JsonProperty(FIELD_NAME_SOURCE) int sourceId,
             @JsonProperty(FIELD_NAME_TARGET) int targetId,
             @JsonProperty(FIELD_NAME_SHUFFLE) ExecEdge.Shuffle shuffle,
@@ -70,24 +72,24 @@ class JsonPlanEdge {
         this.exchangeMode = exchangeMode;
     }
 
-    public int getSourceId() {
+    int getSourceId() {
         return sourceId;
     }
 
-    public int getTargetId() {
+    int getTargetId() {
         return targetId;
     }
 
-    public ExecEdge.Shuffle getShuffle() {
+    ExecEdge.Shuffle getShuffle() {
         return shuffle;
     }
 
-    public StreamExchangeMode getExchangeMode() {
+    StreamExchangeMode getExchangeMode() {
         return exchangeMode;
     }
 
     /** Build {@link JsonPlanEdge} from an {@link ExecEdge}. */
-    public static JsonPlanEdge fromExecEdge(ExecEdge execEdge) {
+    static JsonPlanEdge fromExecEdge(ExecEdge execEdge) {
         return new JsonPlanEdge(
                 execEdge.getSource().getId(),
                 execEdge.getTarget().getId(),
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanGraph.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanGraph.java
index 9e6278c..4dd4a93 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanGraph.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonPlanGraph.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
@@ -48,10 +49,11 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  *
  * <p>This model is used only during serialization/deserialization.
  */
-class JsonPlanGraph {
-    public static final String FIELD_NAME_FLINK_VERSION = "flinkVersion";
-    public static final String FIELD_NAME_NODES = "nodes";
-    public static final String FIELD_NAME_EDGES = "edges";
+@Internal
+final class JsonPlanGraph {
+    static final String FIELD_NAME_FLINK_VERSION = "flinkVersion";
+    static final String FIELD_NAME_NODES = "nodes";
+    static final String FIELD_NAME_EDGES = "edges";
 
     @JsonProperty(FIELD_NAME_FLINK_VERSION)
     private final FlinkVersion flinkVersion;
@@ -63,7 +65,7 @@ class JsonPlanGraph {
     private final List<JsonPlanEdge> edges;
 
     @JsonCreator
-    public JsonPlanGraph(
+    JsonPlanGraph(
             @JsonProperty(FIELD_NAME_FLINK_VERSION) FlinkVersion flinkVersion,
             @JsonProperty(FIELD_NAME_NODES) List<ExecNode<?>> nodes,
             @JsonProperty(FIELD_NAME_EDGES) List<JsonPlanEdge> edges) {
@@ -72,7 +74,7 @@ class JsonPlanGraph {
         this.edges = edges;
     }
 
-    public static JsonPlanGraph fromExecNodeGraph(ExecNodeGraph execGraph) {
+    static JsonPlanGraph fromExecNodeGraph(ExecNodeGraph execGraph) {
         final List<ExecNode<?>> allNodes = new ArrayList<>();
         final List<JsonPlanEdge> allEdges = new ArrayList<>();
         final Set<Integer> nodesIds = new HashSet<>();
@@ -112,7 +114,7 @@ class JsonPlanGraph {
         return new JsonPlanGraph(execGraph.getFlinkVersion(), allNodes, allEdges);
     }
 
-    public ExecNodeGraph convertToExecNodeGraph() {
+    ExecNodeGraph convertToExecNodeGraph() {
         Map<Integer, ExecNode<?>> idToExecNodes = new HashMap<>();
         for (ExecNode<?> execNode : nodes) {
             int id = execNode.getId();
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
index 7e25612..92bec58 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ContextResolvedTable;
@@ -31,11 +32,13 @@ import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.planner.plan.logical.LogicalWindow;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.RequiredDistribution;
 import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
 import org.apache.flink.table.runtime.groupwindow.WindowReference;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.extraction.ExtractionUtils;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -69,7 +72,8 @@ import java.lang.annotation.Annotation;
 import java.lang.reflect.Constructor;
 import java.util.Optional;
 
-/** An utility class that provide abilities for JSON serialization and deserialization. */
+/** A utility class that provide abilities for JSON serialization and deserialization. */
+@Internal
 public class JsonSerdeUtil {
 
     /** Return true if the given class's constructors have @JsonCreator annotation, else false. */
@@ -124,7 +128,7 @@ public class JsonSerdeUtil {
 
     private static Module createFlinkTableJacksonModule() {
         final SimpleModule module = new SimpleModule("Flink table module");
-        ExecNodeMetadataUtil.execNodes().stream()
+        ExecNodeMetadataUtil.execNodes()
                 .forEach(c -> module.registerSubtypes(new NamedType(c, c.getName())));
         registerSerializers(module);
         registerDeserializers(module);
@@ -136,15 +140,10 @@ public class JsonSerdeUtil {
     private static void registerSerializers(SimpleModule module) {
         module.addSerializer(new ExecNodeGraphJsonSerializer());
         module.addSerializer(new FlinkVersionJsonSerializer());
-        // ObjectIdentifierJsonSerializer is needed for LogicalType serialization
         module.addSerializer(new ObjectIdentifierJsonSerializer());
-        // LogicalTypeJsonSerializer is needed for RelDataType serialization
         module.addSerializer(new LogicalTypeJsonSerializer());
-        // DataTypeJsonSerializer is needed for LogicalType serialization
         module.addSerializer(new DataTypeJsonSerializer());
-        // RelDataTypeJsonSerializer is needed for RexNode serialization
         module.addSerializer(new RelDataTypeJsonSerializer());
-        // RexNode is used in many exec nodes, so we register its serializer directly here
         module.addSerializer(new RexNodeJsonSerializer());
         module.addSerializer(new AggregateCallJsonSerializer());
         module.addSerializer(new ChangelogModeJsonSerializer());
@@ -156,24 +155,19 @@ public class JsonSerdeUtil {
         module.addSerializer(new ResolvedCatalogTableJsonSerializer());
         module.addSerializer(new ResolvedExpressionJsonSerializer());
         module.addSerializer(new ResolvedSchemaJsonSerializer());
+        module.addSerializer(new RequiredDistributionJsonSerializer());
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
     private static void registerDeserializers(SimpleModule module) {
         module.addDeserializer(ExecNodeGraph.class, new ExecNodeGraphJsonDeserializer());
         module.addDeserializer(FlinkVersion.class, new FlinkVersionJsonDeserializer());
-        // ObjectIdentifierJsonDeserializer is needed for LogicalType deserialization
         module.addDeserializer(ObjectIdentifier.class, new ObjectIdentifierJsonDeserializer());
-        // LogicalTypeJsonSerializer is needed for RelDataType serialization
         module.addDeserializer(LogicalType.class, new LogicalTypeJsonDeserializer());
-        // DataTypeJsonDeserializer is needed for LogicalType serialization
+        module.addDeserializer(RowType.class, (StdDeserializer) new LogicalTypeJsonDeserializer());
         module.addDeserializer(DataType.class, new DataTypeJsonDeserializer());
-        // RelDataTypeJsonSerializer is needed for RexNode serialization
         module.addDeserializer(RelDataType.class, new RelDataTypeJsonDeserializer());
-        // RexNode is used in many exec nodes, so we register its deserializer directly here
         module.addDeserializer(RexNode.class, new RexNodeJsonDeserializer());
-        // We need this explicit mapping to make sure Jackson can deserialize POJOs declaring fields
-        // with RexLiteral instead of RexNode.
         module.addDeserializer(RexLiteral.class, (StdDeserializer) new RexNodeJsonDeserializer());
         module.addDeserializer(AggregateCall.class, new AggregateCallJsonDeserializer());
         module.addDeserializer(ChangelogMode.class, new ChangelogModeJsonDeserializer());
@@ -187,6 +181,8 @@ public class JsonSerdeUtil {
                 ResolvedCatalogTable.class, new ResolvedCatalogTableJsonDeserializer());
         module.addDeserializer(ResolvedExpression.class, new ResolvedExpressionJsonDeserializer());
         module.addDeserializer(ResolvedSchema.class, new ResolvedSchemaJsonDeserializer());
+        module.addDeserializer(
+                RequiredDistribution.class, new RequiredDistributionJsonDeserializer());
     }
 
     private static void registerMixins(SimpleModule module) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonDeserializer.java
index df48ca4..99ee1e9 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonDeserializer.java
@@ -97,10 +97,10 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJs
  * @see LogicalTypeJsonSerializer for the reverse operation
  */
 @Internal
-public class LogicalTypeJsonDeserializer extends StdDeserializer<LogicalType> {
+final class LogicalTypeJsonDeserializer extends StdDeserializer<LogicalType> {
     private static final long serialVersionUID = 1L;
 
-    public LogicalTypeJsonDeserializer() {
+    LogicalTypeJsonDeserializer() {
         super(LogicalType.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java
index 8507984..390ef6e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java
@@ -64,56 +64,56 @@ import java.io.IOException;
  * @see LogicalTypeJsonDeserializer for the reverse operation.
  */
 @Internal
-public final class LogicalTypeJsonSerializer extends StdSerializer<LogicalType> {
+final class LogicalTypeJsonSerializer extends StdSerializer<LogicalType> {
     private static final long serialVersionUID = 1L;
 
     // Common fields
-    public static final String FIELD_NAME_TYPE_NAME = "type";
-    public static final String FIELD_NAME_NULLABLE = "nullable";
-    public static final String FIELD_NAME_DESCRIPTION = "description";
+    static final String FIELD_NAME_TYPE_NAME = "type";
+    static final String FIELD_NAME_NULLABLE = "nullable";
+    static final String FIELD_NAME_DESCRIPTION = "description";
 
     // CHAR, VARCHAR, BINARY, VARBINARY
-    public static final String FIELD_NAME_LENGTH = "length";
+    static final String FIELD_NAME_LENGTH = "length";
 
     // TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE
-    public static final String FIELD_NAME_PRECISION = "precision";
-    public static final String FIELD_NAME_TIMESTAMP_KIND = "kind";
+    static final String FIELD_NAME_PRECISION = "precision";
+    static final String FIELD_NAME_TIMESTAMP_KIND = "kind";
 
     // ARRAY, MULTISET
-    public static final String FIELD_NAME_ELEMENT_TYPE = "elementType";
+    static final String FIELD_NAME_ELEMENT_TYPE = "elementType";
 
     // MAP
-    public static final String FIELD_NAME_KEY_TYPE = "keyType";
-    public static final String FIELD_NAME_VALUE_TYPE = "valueType";
+    static final String FIELD_NAME_KEY_TYPE = "keyType";
+    static final String FIELD_NAME_VALUE_TYPE = "valueType";
 
     // ROW
-    public static final String FIELD_NAME_FIELDS = "fields";
-    public static final String FIELD_NAME_FIELD_NAME = "name";
-    public static final String FIELD_NAME_FIELD_TYPE = "fieldType";
-    public static final String FIELD_NAME_FIELD_DESCRIPTION = "description";
+    static final String FIELD_NAME_FIELDS = "fields";
+    static final String FIELD_NAME_FIELD_NAME = "name";
+    static final String FIELD_NAME_FIELD_TYPE = "fieldType";
+    static final String FIELD_NAME_FIELD_DESCRIPTION = "description";
 
     // DISTINCT_TYPE
-    public static final String FIELD_NAME_SOURCE_TYPE = "sourceType";
+    static final String FIELD_NAME_SOURCE_TYPE = "sourceType";
 
     // STRUCTURED_TYPE
-    public static final String FIELD_NAME_OBJECT_IDENTIFIER = "objectIdentifier";
-    public static final String FIELD_NAME_IMPLEMENTATION_CLASS = "implementationClass";
-    public static final String FIELD_NAME_ATTRIBUTES = "attributes";
-    public static final String FIELD_NAME_ATTRIBUTE_NAME = "name";
-    public static final String FIELD_NAME_ATTRIBUTE_TYPE = "attributeType";
-    public static final String FIELD_NAME_ATTRIBUTE_DESCRIPTION = "description";
-    public static final String FIELD_NAME_FINAL = "final";
-    public static final String FIELD_NAME_INSTANTIABLE = "instantiable";
-    public static final String FIELD_NAME_COMPARISON = "comparison";
-    public static final String FIELD_NAME_SUPER_TYPE = "superType";
+    static final String FIELD_NAME_OBJECT_IDENTIFIER = "objectIdentifier";
+    static final String FIELD_NAME_IMPLEMENTATION_CLASS = "implementationClass";
+    static final String FIELD_NAME_ATTRIBUTES = "attributes";
+    static final String FIELD_NAME_ATTRIBUTE_NAME = "name";
+    static final String FIELD_NAME_ATTRIBUTE_TYPE = "attributeType";
+    static final String FIELD_NAME_ATTRIBUTE_DESCRIPTION = "description";
+    static final String FIELD_NAME_FINAL = "final";
+    static final String FIELD_NAME_INSTANTIABLE = "instantiable";
+    static final String FIELD_NAME_COMPARISON = "comparison";
+    static final String FIELD_NAME_SUPER_TYPE = "superType";
 
     // RAW
-    public static final String FIELD_NAME_CLASS = "class";
-    public static final String FIELD_NAME_EXTERNAL_DATA_TYPE = "externalDataType";
-    public static final String FIELD_NAME_SPECIAL_SERIALIZER = "specialSerializer";
-    public static final String FIELD_VALUE_EXTERNAL_SERIALIZER_NULL = "NULL";
+    static final String FIELD_NAME_CLASS = "class";
+    static final String FIELD_NAME_EXTERNAL_DATA_TYPE = "externalDataType";
+    static final String FIELD_NAME_SPECIAL_SERIALIZER = "specialSerializer";
+    static final String FIELD_VALUE_EXTERNAL_SERIALIZER_NULL = "NULL";
 
-    public LogicalTypeJsonSerializer() {
+    LogicalTypeJsonSerializer() {
         super(LogicalType.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java
index d6d9151..3a0282b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
@@ -54,13 +55,15 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindow
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer.KIND_TUMBLING;
 
 /**
- * JSON deserializer for {@link LogicalWindow}, refer to {@link LogicalWindowJsonSerializer} for
- * serializer.
+ * JSON deserializer for {@link LogicalWindow}.
+ *
+ * @see LogicalWindowJsonSerializer for the reverse operation
  */
-public class LogicalWindowJsonDeserializer extends StdDeserializer<LogicalWindow> {
+@Internal
+final class LogicalWindowJsonDeserializer extends StdDeserializer<LogicalWindow> {
     private static final long serialVersionUID = 1L;
 
-    public LogicalWindowJsonDeserializer() {
+    LogicalWindowJsonDeserializer() {
         super(LogicalWindow.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonSerializer.java
index 2195b5f..518e7d2 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalWindowJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
@@ -39,31 +40,33 @@ import static org.apache.flink.table.planner.plan.utils.AggregateUtil.toDuration
 import static org.apache.flink.table.planner.plan.utils.AggregateUtil.toLong;
 
 /**
- * JSON serializer for {@link LogicalWindow}, refer to {@link LogicalWindowJsonDeserializer} for
- * deserializer.
+ * JSON serializer for {@link LogicalWindow}.
+ *
+ * @see LogicalTypeJsonDeserializer for the reverse operation
  */
-public class LogicalWindowJsonSerializer extends StdSerializer<LogicalWindow> {
+@Internal
+final class LogicalWindowJsonSerializer extends StdSerializer<LogicalWindow> {
     private static final long serialVersionUID = 1L;
 
-    public static final String FIELD_NAME_KIND = "kind";
-    public static final String KIND_TUMBLING = "TUMBLING";
-    public static final String KIND_SLIDING = "SLIDING";
-    public static final String KIND_SESSION = "SESSION";
+    static final String FIELD_NAME_KIND = "kind";
+    static final String KIND_TUMBLING = "TUMBLING";
+    static final String KIND_SLIDING = "SLIDING";
+    static final String KIND_SESSION = "SESSION";
 
-    public static final String FIELD_NAME_ALIAS = "alias";
-    public static final String FIELD_NAME_TIME_FIELD = "timeField";
-    public static final String FIELD_NAME_FIELD_NAME = "fieldName";
-    public static final String FIELD_NAME_FIELD_INDEX = "fieldIndex";
-    public static final String FIELD_NAME_INPUT_INDEX = "inputIndex";
-    public static final String FIELD_NAME_FIELD_TYPE = "fieldType";
+    static final String FIELD_NAME_ALIAS = "alias";
+    static final String FIELD_NAME_TIME_FIELD = "timeField";
+    static final String FIELD_NAME_FIELD_NAME = "fieldName";
+    static final String FIELD_NAME_FIELD_INDEX = "fieldIndex";
+    static final String FIELD_NAME_INPUT_INDEX = "inputIndex";
+    static final String FIELD_NAME_FIELD_TYPE = "fieldType";
 
-    public static final String FIELD_NAME_SIZE = "size";
-    public static final String FIELD_NAME_IS_TIME_WINDOW = "isTimeWindow";
+    static final String FIELD_NAME_SIZE = "size";
+    static final String FIELD_NAME_IS_TIME_WINDOW = "isTimeWindow";
 
-    public static final String FIELD_NAME_SLIDE = "slide";
-    public static final String FIELD_NAME_GAP = "gap";
+    static final String FIELD_NAME_SLIDE = "slide";
+    static final String FIELD_NAME_GAP = "gap";
 
-    public LogicalWindowJsonSerializer() {
+    LogicalWindowJsonSerializer() {
         super(LogicalWindow.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonDeserializer.java
index 0492d44..9285483 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
@@ -28,11 +29,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std
 
 import java.io.IOException;
 
-/** JSON deserializer for {@link ObjectIdentifier}. */
-public class ObjectIdentifierJsonDeserializer extends StdDeserializer<ObjectIdentifier> {
+/**
+ * JSON deserializer for {@link ObjectIdentifier}.
+ *
+ * @see ObjectIdentifierJsonSerializer for the reverse operation
+ */
+@Internal
+final class ObjectIdentifierJsonDeserializer extends StdDeserializer<ObjectIdentifier> {
     private static final long serialVersionUID = 1L;
 
-    public ObjectIdentifierJsonDeserializer() {
+    ObjectIdentifierJsonDeserializer() {
         super(ObjectIdentifier.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonSerializer.java
index d14e37d..fb23dcd 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -26,11 +27,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 
 import java.io.IOException;
 
-/** JSON serializer for {@link ObjectIdentifier}. */
-public class ObjectIdentifierJsonSerializer extends StdSerializer<ObjectIdentifier> {
+/**
+ * JSON serializer for {@link ObjectIdentifier}.
+ *
+ * @see ObjectIdentifierJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ObjectIdentifierJsonSerializer extends StdSerializer<ObjectIdentifier> {
     private static final long serialVersionUID = 1L;
 
-    public ObjectIdentifierJsonSerializer() {
+    ObjectIdentifierJsonSerializer() {
         super(ObjectIdentifier.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java
index b1c8cd0..467fc9e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java
@@ -38,10 +38,10 @@ import java.io.IOException;
  * @see RelDataTypeJsonSerializer for the reverse operation
  */
 @Internal
-public class RelDataTypeJsonDeserializer extends StdDeserializer<RelDataType> {
+final class RelDataTypeJsonDeserializer extends StdDeserializer<RelDataType> {
     private static final long serialVersionUID = 1L;
 
-    public RelDataTypeJsonDeserializer() {
+    RelDataTypeJsonDeserializer() {
         super(RelDataType.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java
index 5d76abb..da86cc5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java
@@ -37,10 +37,10 @@ import java.io.IOException;
  * @see RelDataTypeJsonDeserializer for the reverse operation.
  */
 @Internal
-public final class RelDataTypeJsonSerializer extends StdSerializer<RelDataType> {
+final class RelDataTypeJsonSerializer extends StdSerializer<RelDataType> {
     private static final long serialVersionUID = 1L;
 
-    public RelDataTypeJsonSerializer() {
+    RelDataTypeJsonSerializer() {
         super(RelDataType.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java
index 24db15a..cbbb7a6 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonDeserializer.java
@@ -18,30 +18,35 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.RequiredDistribution;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 
 import java.io.IOException;
 
-/** JSON deserializer for {@link RequiredDistribution}. */
-public class RequiredDistributionJsonDeserializer extends StdDeserializer<RequiredDistribution> {
+/**
+ * JSON deserializer for {@link RequiredDistribution}.
+ *
+ * @see RelDataTypeJsonSerializer for the reverse operation
+ */
+@Internal
+final class RequiredDistributionJsonDeserializer extends StdDeserializer<RequiredDistribution> {
     private static final long serialVersionUID = 1L;
 
-    public RequiredDistributionJsonDeserializer() {
+    RequiredDistributionJsonDeserializer() {
         super(RequiredDistribution.class);
     }
 
     @Override
     public RequiredDistribution deserialize(JsonParser jsonParser, DeserializationContext ctx)
-            throws IOException, JsonProcessingException {
+            throws IOException {
         JsonNode jsonNode = jsonParser.getCodec().readTree(jsonParser);
         DistributionType type =
                 DistributionType.valueOf(jsonNode.get("type").asText().toUpperCase());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java
index df47b80..1817557 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RequiredDistributionJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.DistributionType;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty.HashDistribution;
@@ -29,11 +30,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 
 import java.io.IOException;
 
-/** JSON serializer for {@link RequiredDistribution}. */
-public class RequiredDistributionJsonSerializer extends StdSerializer<RequiredDistribution> {
+/**
+ * JSON serializer for {@link RequiredDistribution}.
+ *
+ * @see RequiredDistributionJsonDeserializer for the reverse operation
+ */
+@Internal
+final class RequiredDistributionJsonSerializer extends StdSerializer<RequiredDistribution> {
     private static final long serialVersionUID = 1L;
 
-    public RequiredDistributionJsonSerializer() {
+    RequiredDistributionJsonSerializer() {
         super(RequiredDistribution.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonDeserializer.java
index b19a618..86ce77e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
@@ -40,10 +41,16 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedCatal
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedCatalogTableJsonSerializer.PARTITION_KEYS;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedCatalogTableJsonSerializer.RESOLVED_SCHEMA;
 
-class ResolvedCatalogTableJsonDeserializer extends StdDeserializer<ResolvedCatalogTable> {
+/**
+ * JSON deserializer for {@link ResolvedCatalogTable}.
+ *
+ * @see ResolvedCatalogTableJsonSerializer for the reverse operation
+ */
+@Internal
+final class ResolvedCatalogTableJsonDeserializer extends StdDeserializer<ResolvedCatalogTable> {
     private static final long serialVersionUID = 1L;
 
-    public ResolvedCatalogTableJsonDeserializer() {
+    ResolvedCatalogTableJsonDeserializer() {
         super(ResolvedCatalogTable.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonSerializer.java
index c088f57..25d9c2f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
@@ -31,17 +32,23 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 
 import java.io.IOException;
 
-class ResolvedCatalogTableJsonSerializer extends StdSerializer<ResolvedCatalogTable> {
+/**
+ * JSON serializer for {@link ResolvedCatalogTable}.
+ *
+ * @see ResolvedCatalogTableJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ResolvedCatalogTableJsonSerializer extends StdSerializer<ResolvedCatalogTable> {
     private static final long serialVersionUID = 1L;
 
     static final String SERIALIZE_OPTIONS = "serialize_options";
 
-    public static final String RESOLVED_SCHEMA = "schema";
-    public static final String PARTITION_KEYS = "partitionKeys";
-    public static final String OPTIONS = "options";
-    public static final String COMMENT = "comment";
+    static final String RESOLVED_SCHEMA = "schema";
+    static final String PARTITION_KEYS = "partitionKeys";
+    static final String OPTIONS = "options";
+    static final String COMMENT = "comment";
 
-    public ResolvedCatalogTableJsonSerializer() {
+    ResolvedCatalogTableJsonSerializer() {
         super(ResolvedCatalogTable.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonDeserializer.java
index f513ea9..2d6c1ee 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.expressions.ResolvedExpression;
@@ -41,9 +42,15 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedExpre
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedExpressionJsonSerializer.TYPE;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedExpressionJsonSerializer.TYPE_REX_NODE_EXPRESSION;
 
-class ResolvedExpressionJsonDeserializer extends StdDeserializer<ResolvedExpression> {
+/**
+ * JSON deserializer for {@link ResolvedExpression}.
+ *
+ * @see ResolvedExpressionJsonSerializer for the reverse operation
+ */
+@Internal
+final class ResolvedExpressionJsonDeserializer extends StdDeserializer<ResolvedExpression> {
 
-    protected ResolvedExpressionJsonDeserializer() {
+    ResolvedExpressionJsonDeserializer() {
         super(ResolvedExpression.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonSerializer.java
index 9dfa27d..b1c2009 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.planner.expressions.RexNodeExpression;
@@ -28,14 +29,20 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 
 import java.io.IOException;
 
-class ResolvedExpressionJsonSerializer extends StdSerializer<ResolvedExpression> {
+/**
+ * JSON serializer for {@link ResolvedExpression}.
+ *
+ * @see ResolvedExpressionJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ResolvedExpressionJsonSerializer extends StdSerializer<ResolvedExpression> {
 
-    public static final String TYPE = "type";
-    public static final String TYPE_REX_NODE_EXPRESSION = "rexNodeExpression";
-    public static final String REX_NODE = "rexNode";
-    public static final String SERIALIZABLE_STRING = "serializableString";
+    static final String TYPE = "type";
+    static final String TYPE_REX_NODE_EXPRESSION = "rexNodeExpression";
+    static final String REX_NODE = "rexNode";
+    static final String SERIALIZABLE_STRING = "serializableString";
 
-    protected ResolvedExpressionJsonSerializer() {
+    ResolvedExpressionJsonSerializer() {
         super(ResolvedExpression.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java
index 94e20ad..630ce93 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UniqueConstraint;
@@ -37,10 +38,16 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchem
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchemaJsonSerializer.PRIMARY_KEY;
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchemaJsonSerializer.WATERMARK_SPECS;
 
-class ResolvedSchemaJsonDeserializer extends StdDeserializer<ResolvedSchema> {
+/**
+ * JSON deserializer for {@link ResolvedSchema}.
+ *
+ * @see ResolvedSchemaJsonSerializer for the reverse operation
+ */
+@Internal
+final class ResolvedSchemaJsonDeserializer extends StdDeserializer<ResolvedSchema> {
     private static final long serialVersionUID = 1L;
 
-    public ResolvedSchemaJsonDeserializer() {
+    ResolvedSchemaJsonDeserializer() {
         super(ResolvedSchema.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java
index da8bb8b..393d5ab 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.ResolvedSchema;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -28,14 +29,20 @@ import java.io.IOException;
 
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.serializeOptionalField;
 
-class ResolvedSchemaJsonSerializer extends StdSerializer<ResolvedSchema> {
+/**
+ * JSON serializer for {@link ResolvedSchema}.
+ *
+ * @see ResolvedSchemaJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ResolvedSchemaJsonSerializer extends StdSerializer<ResolvedSchema> {
     private static final long serialVersionUID = 1L;
 
-    public static final String COLUMNS = "columns";
-    public static final String WATERMARK_SPECS = "watermarkSpecs";
-    public static final String PRIMARY_KEY = "primaryKey";
+    static final String COLUMNS = "columns";
+    static final String WATERMARK_SPECS = "watermarkSpecs";
+    static final String PRIMARY_KEY = "primaryKey";
 
-    public ResolvedSchemaJsonSerializer() {
+    ResolvedSchemaJsonSerializer() {
         super(ResolvedSchema.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
index ff9e322..8e52495 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java
@@ -110,10 +110,10 @@ import static org.apache.flink.table.planner.typeutils.SymbolUtil.serializableTo
  * @see RexNodeJsonSerializer for the reverse operation
  */
 @Internal
-public class RexNodeJsonDeserializer extends StdDeserializer<RexNode> {
+final class RexNodeJsonDeserializer extends StdDeserializer<RexNode> {
     private static final long serialVersionUID = 1L;
 
-    public RexNodeJsonDeserializer() {
+    RexNodeJsonDeserializer() {
         super(RexNode.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
index d38f08a..7e38317 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
@@ -72,53 +72,53 @@ import static org.apache.flink.table.planner.typeutils.SymbolUtil.calciteToSeria
  * @see RexNodeJsonDeserializer for the reverse operation
  */
 @Internal
-public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
+final class RexNodeJsonSerializer extends StdSerializer<RexNode> {
     private static final long serialVersionUID = 1L;
 
     // Common fields
-    public static final String FIELD_NAME_KIND = "kind";
-    public static final String FIELD_NAME_VALUE = "value";
-    public static final String FIELD_NAME_TYPE = "type";
-    public static final String FIELD_NAME_NAME = "name";
+    static final String FIELD_NAME_KIND = "kind";
+    static final String FIELD_NAME_VALUE = "value";
+    static final String FIELD_NAME_TYPE = "type";
+    static final String FIELD_NAME_NAME = "name";
 
     // INPUT_REF
-    public static final String KIND_INPUT_REF = "INPUT_REF";
-    public static final String FIELD_NAME_INPUT_INDEX = "inputIndex";
+    static final String KIND_INPUT_REF = "INPUT_REF";
+    static final String FIELD_NAME_INPUT_INDEX = "inputIndex";
 
     // LITERAL
-    public static final String KIND_LITERAL = "LITERAL";
+    static final String KIND_LITERAL = "LITERAL";
     // Sarg fields and values
-    public static final String FIELD_NAME_SARG = "sarg";
-    public static final String FIELD_NAME_RANGES = "ranges";
-    public static final String FIELD_NAME_BOUND_LOWER = "lower";
-    public static final String FIELD_NAME_BOUND_UPPER = "upper";
-    public static final String FIELD_NAME_BOUND_TYPE = "boundType";
-    public static final String FIELD_NAME_CONTAINS_NULL = "containsNull";
+    static final String FIELD_NAME_SARG = "sarg";
+    static final String FIELD_NAME_RANGES = "ranges";
+    static final String FIELD_NAME_BOUND_LOWER = "lower";
+    static final String FIELD_NAME_BOUND_UPPER = "upper";
+    static final String FIELD_NAME_BOUND_TYPE = "boundType";
+    static final String FIELD_NAME_CONTAINS_NULL = "containsNull";
     // Symbol fields
-    public static final String FIELD_NAME_SYMBOL = "symbol";
+    static final String FIELD_NAME_SYMBOL = "symbol";
 
     // FIELD_ACCESS
-    public static final String KIND_FIELD_ACCESS = "FIELD_ACCESS";
-    public static final String FIELD_NAME_EXPR = "expr";
+    static final String KIND_FIELD_ACCESS = "FIELD_ACCESS";
+    static final String FIELD_NAME_EXPR = "expr";
 
     // CORREL_VARIABLE
-    public static final String KIND_CORREL_VARIABLE = "CORREL_VARIABLE";
-    public static final String FIELD_NAME_CORREL = "correl";
+    static final String KIND_CORREL_VARIABLE = "CORREL_VARIABLE";
+    static final String FIELD_NAME_CORREL = "correl";
 
     // PATTERN_INPUT_REF
-    public static final String KIND_PATTERN_INPUT_REF = "PATTERN_INPUT_REF";
-    public static final String FIELD_NAME_ALPHA = "alpha";
+    static final String KIND_PATTERN_INPUT_REF = "PATTERN_INPUT_REF";
+    static final String FIELD_NAME_ALPHA = "alpha";
 
     // CALL
-    public static final String KIND_CALL = "CALL";
-    public static final String FIELD_NAME_OPERANDS = "operands";
-    public static final String FIELD_NAME_INTERNAL_NAME = "internalName";
-    public static final String FIELD_NAME_SYSTEM_NAME = "systemName";
-    public static final String FIELD_NAME_CATALOG_NAME = "catalogName";
-    public static final String FIELD_NAME_SYNTAX = "syntax";
-    public static final String FIELD_NAME_CLASS = "class";
-
-    public RexNodeJsonSerializer() {
+    static final String KIND_CALL = "CALL";
+    static final String FIELD_NAME_OPERANDS = "operands";
+    static final String FIELD_NAME_INTERNAL_NAME = "internalName";
+    static final String FIELD_NAME_SYSTEM_NAME = "systemName";
+    static final String FIELD_NAME_CATALOG_NAME = "catalogName";
+    static final String FIELD_NAME_SYNTAX = "syntax";
+    static final String FIELD_NAME_CLASS = "class";
+
+    RexNodeJsonSerializer() {
         super(RexNode.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonDeserializer.java
index 9f1b8e3..a663222 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonDeserializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -41,12 +42,14 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexWindowBoun
 import static org.apache.flink.table.planner.plan.nodes.exec.serde.RexWindowBoundJsonSerializer.KIND_UNBOUNDED_PRECEDING;
 
 /**
- * JSON deserializer for {@link RexWindowBound}. refer to {@link RexWindowBoundJsonSerializer} for
- * serializer.
+ * JSON deserializer for {@link RexWindowBound}.
+ *
+ * @see RexWindowBoundJsonSerializer for the reverse operation
  */
-public class RexWindowBoundJsonDeserializer extends StdDeserializer<RexWindowBound> {
+@Internal
+final class RexWindowBoundJsonDeserializer extends StdDeserializer<RexWindowBound> {
 
-    public RexWindowBoundJsonDeserializer() {
+    RexWindowBoundJsonDeserializer() {
         super(RexWindowBound.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java
index 021731c..658db94 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -29,25 +30,24 @@ import org.apache.calcite.rex.RexWindowBound;
 import java.io.IOException;
 
 /**
- * JSON serializer for {@link RexWindowBound}. refer to {@link RexWindowBoundJsonDeserializer} for
- * deserializer.
+ * JSON serializer for {@link RexWindowBound}.
  *
- * <p>Supports serialize CURRENT_ROW, UNBOUNDED_PRECEDING, UNBOUNDED_FOLLOWING, Preceding Bounded
- * Window and Following Bounded Window.
+ * @see RexWindowBoundJsonDeserializer for the reverse operation
  */
-public class RexWindowBoundJsonSerializer extends StdSerializer<RexWindowBound> {
+@Internal
+final class RexWindowBoundJsonSerializer extends StdSerializer<RexWindowBound> {
 
-    public static final String FIELD_NAME_KIND = "kind";
-    public static final String KIND_CURRENT_ROW = "CURRENT_ROW";
-    public static final String KIND_UNBOUNDED_PRECEDING = "UNBOUNDED_PRECEDING";
-    public static final String KIND_UNBOUNDED_FOLLOWING = "UNBOUNDED_FOLLOWING";
-    public static final String KIND_BOUNDED_WINDOW = "BOUNDED_WINDOW";
+    static final String FIELD_NAME_KIND = "kind";
+    static final String KIND_CURRENT_ROW = "CURRENT_ROW";
+    static final String KIND_UNBOUNDED_PRECEDING = "UNBOUNDED_PRECEDING";
+    static final String KIND_UNBOUNDED_FOLLOWING = "UNBOUNDED_FOLLOWING";
+    static final String KIND_BOUNDED_WINDOW = "BOUNDED_WINDOW";
 
-    public static final String FIELD_NAME_IS_PRECEDING = "isPreceding";
-    public static final String FIELD_NAME_IS_FOLLOWING = "isFollowing";
-    public static final String FIELD_NAME_OFFSET = "offset";
+    static final String FIELD_NAME_IS_PRECEDING = "isPreceding";
+    static final String FIELD_NAME_IS_FOLLOWING = "isFollowing";
+    static final String FIELD_NAME_OFFSET = "offset";
 
-    public RexWindowBoundJsonSerializer() {
+    RexWindowBoundJsonSerializer() {
         super(RexWindowBound.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SerdeContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SerdeContext.java
index daec5ba..1274132 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SerdeContext.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/SerdeContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.planner.calcite.FlinkContext;
@@ -33,6 +34,7 @@ import org.apache.calcite.sql.SqlOperatorTable;
 /**
  * A context to allow the store user-defined data within ExecNode serialization and deserialization.
  */
+@Internal
 public class SerdeContext {
     static final String SERDE_CONTEXT_KEY = "serdeCtx";
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonDeserializer.java
index 2a61222..80ff766 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonDeserializer.java
@@ -18,29 +18,34 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.Shuffle;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
 
 import java.io.IOException;
 
-/** JSON deserializer for {@link Shuffle}. */
-public class ShuffleJsonDeserializer extends StdDeserializer<Shuffle> {
+/**
+ * JSON deserializer for {@link Shuffle}.
+ *
+ * @see ShuffleJsonSerializer for the reverse operation
+ */
+@Internal
+final class ShuffleJsonDeserializer extends StdDeserializer<Shuffle> {
     private static final long serialVersionUID = 1L;
 
-    public ShuffleJsonDeserializer() {
+    ShuffleJsonDeserializer() {
         super(Shuffle.class);
     }
 
     @Override
     public Shuffle deserialize(JsonParser jsonParser, DeserializationContext ctx)
-            throws IOException, JsonProcessingException {
+            throws IOException {
         JsonNode jsonNode = jsonParser.getCodec().readTree(jsonParser);
         Shuffle.Type type = Shuffle.Type.valueOf(jsonNode.get("type").asText().toUpperCase());
         switch (type) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonSerializer.java
index 1607df4..5603186 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ShuffleJsonSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.HashShuffle;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.Shuffle;
@@ -28,11 +29,16 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.S
 
 import java.io.IOException;
 
-/** JSON serializer for {@link Shuffle}. */
-public class ShuffleJsonSerializer extends StdSerializer<Shuffle> {
+/**
+ * JSON serializer for {@link Shuffle}.
+ *
+ * @see ShuffleJsonDeserializer for the reverse operation
+ */
+@Internal
+final class ShuffleJsonSerializer extends StdSerializer<Shuffle> {
     private static final long serialVersionUID = 1L;
 
-    public ShuffleJsonSerializer() {
+    ShuffleJsonSerializer() {
         super(Shuffle.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
index 78a87829..6250157 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.Constraint.ConstraintType;
 import org.apache.flink.table.catalog.UniqueConstraint;
 
@@ -28,12 +29,13 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import java.util.List;
 
 /** Mixin for {@link UniqueConstraint}. */
+@Internal
 abstract class UniqueConstraintMixin {
 
-    public static final String NAME = "name";
-    public static final String ENFORCED = "enforced";
-    public static final String TYPE = "type";
-    public static final String COLUMNS = "columns";
+    static final String NAME = "name";
+    static final String ENFORCED = "enforced";
+    static final String TYPE = "type";
+    static final String COLUMNS = "columns";
 
     @JsonCreator
     private UniqueConstraintMixin(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WatermarkSpecMixin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WatermarkSpecMixin.java
index 26fa1c9..d914c48 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WatermarkSpecMixin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WatermarkSpecMixin.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.WatermarkSpec;
 import org.apache.flink.table.expressions.ResolvedExpression;
 
@@ -25,10 +26,11 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 /** Mixin for {@link WatermarkSpec}. */
+@Internal
 abstract class WatermarkSpecMixin {
 
-    public static final String ROWTIME_ATTRIBUTE = "rowtimeAttribute";
-    public static final String EXPRESSION = "expression";
+    static final String ROWTIME_ATTRIBUTE = "rowtimeAttribute";
+    static final String EXPRESSION = "expression";
 
     @JsonCreator
     private WatermarkSpecMixin(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonDeserializer.java
index 2a03d5c..f4abacb 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonDeserializer.java
@@ -40,10 +40,10 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.WindowReferen
  */
 @Deprecated
 @Internal
-public final class WindowReferenceJsonDeserializer extends StdDeserializer<WindowReference> {
+final class WindowReferenceJsonDeserializer extends StdDeserializer<WindowReference> {
     private static final long serialVersionUID = 1L;
 
-    protected WindowReferenceJsonDeserializer() {
+    WindowReferenceJsonDeserializer() {
         super(WindowReference.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonSerializer.java
index 179a4cd..e1f296c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/WindowReferenceJsonSerializer.java
@@ -35,13 +35,13 @@ import java.io.IOException;
  */
 @Deprecated
 @Internal
-public final class WindowReferenceJsonSerializer extends StdSerializer<WindowReference> {
+final class WindowReferenceJsonSerializer extends StdSerializer<WindowReference> {
     private static final long serialVersionUID = 1L;
 
-    public static final String FIELD_NAME_NAME = "name";
-    public static final String FIELD_NAME_TYPE = "type";
+    static final String FIELD_NAME_NAME = "name";
+    static final String FIELD_NAME_TYPE = "type";
 
-    protected WindowReferenceJsonSerializer() {
+    WindowReferenceJsonSerializer() {
         super(WindowReference.class);
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/OverSpec.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/OverSpec.java
index 63d1fdc..c7ea182 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/OverSpec.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/OverSpec.java
@@ -18,15 +18,10 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.spec;
 
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RexWindowBoundJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RexWindowBoundJsonSerializer;
-
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rex.RexLiteral;
@@ -129,14 +124,10 @@ public class OverSpec {
 
         /** The lower bound of the window. */
         @JsonProperty(FIELD_NAME_LOWER_BOUND)
-        @JsonSerialize(using = RexWindowBoundJsonSerializer.class)
-        @JsonDeserialize(using = RexWindowBoundJsonDeserializer.class)
         private final RexWindowBound lowerBound;
 
         /** The upper bound of the window. */
         @JsonProperty(FIELD_NAME_UPPER_BOUND)
-        @JsonSerialize(using = RexWindowBoundJsonSerializer.class)
-        @JsonDeserialize(using = RexWindowBoundJsonDeserializer.class)
         private final RexWindowBound upperBound;
 
         /** The agg functions set. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
index 7b31d27..9e68aa5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalGroupAggregate.java
@@ -35,8 +35,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
 import org.apache.flink.table.planner.plan.utils.AggregateUtil;
@@ -56,8 +54,6 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.tools.RelBuilder;
@@ -108,8 +104,6 @@ public class StreamExecGlobalGroupAggregate extends StreamExecAggregateBase {
 
     /** The input row type of this node's local agg. */
     @JsonProperty(FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE)
-    @JsonSerialize(using = LogicalTypeJsonSerializer.class)
-    @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
     private final RowType localAggInputRowType;
 
     /** Whether this node will generate UPDATE_BEFORE messages. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
index b026708..d3c2710 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
@@ -34,8 +34,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
 import org.apache.flink.table.planner.plan.utils.AggregateUtil;
@@ -57,8 +55,6 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.tools.RelBuilder;
@@ -99,8 +95,6 @@ public class StreamExecGlobalWindowAggregate extends StreamExecWindowAggregateBa
 
     /** The input row type of this node's local agg. */
     @JsonProperty(FIELD_NAME_LOCAL_AGG_INPUT_ROW_TYPE)
-    @JsonSerialize(using = LogicalTypeJsonSerializer.class)
-    @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
     private final RowType localAggInputRowType;
 
     public StreamExecGlobalWindowAggregate(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
index be4cd76..0e33097 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
@@ -41,8 +41,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalWindowJsonSerializer;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
@@ -68,8 +66,6 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.tools.RelBuilder;
@@ -133,8 +129,6 @@ public class StreamExecGroupWindowAggregate extends StreamExecAggregateBase {
     private final AggregateCall[] aggCalls;
 
     @JsonProperty(FIELD_NAME_WINDOW)
-    @JsonSerialize(using = LogicalWindowJsonSerializer.class)
-    @JsonDeserialize(using = LogicalWindowJsonDeserializer.class)
     private final LogicalWindow window;
 
     @JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
index e6e354d..9322899 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
@@ -32,8 +32,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.LogicalTypeJsonSerializer;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
 import org.apache.flink.table.planner.plan.utils.AggregateUtil;
@@ -49,8 +47,6 @@ import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.tools.RelBuilder;
@@ -107,8 +103,6 @@ public class StreamExecIncrementalGroupAggregate extends StreamExecAggregateBase
 
     /** The input row type of this node's partial local agg. */
     @JsonProperty(FIELD_NAME_PARTIAL_LOCAL_AGG_INPUT_TYPE)
-    @JsonSerialize(using = LogicalTypeJsonSerializer.class)
-    @JsonDeserialize(using = LogicalTypeJsonDeserializer.class)
     private final RowType partialLocalAggInputType;
 
     /** Whether this node consumes retraction messages. */
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
index 0577a51..2334159 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
@@ -33,8 +33,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.ChangelogModeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.ChangelogModeJsonSerializer;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
 import org.apache.flink.table.runtime.typeutils.TypeCheckUtils;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -43,8 +41,6 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -80,8 +76,6 @@ public class StreamExecSink extends CommonExecSink implements StreamExecNode<Obj
     public static final String FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE = "requireUpsertMaterialize";
 
     @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODE)
-    @JsonSerialize(using = ChangelogModeJsonSerializer.class)
-    @JsonDeserialize(using = ChangelogModeJsonDeserializer.class)
     private final ChangelogMode inputChangelogMode;
 
     @JsonProperty(FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
index 2b973dc6..0353338 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/LookupJoinUtil.java
@@ -24,8 +24,6 @@ import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
 import org.apache.flink.table.functions.UserDefinedFunction;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonDeserializer;
-import org.apache.flink.table.planner.plan.nodes.exec.serde.RexNodeJsonSerializer;
 import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable;
 import org.apache.flink.table.planner.plan.schema.TableSourceTable;
 import org.apache.flink.table.runtime.connector.source.LookupRuntimeProviderContext;
@@ -38,8 +36,6 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.rex.RexLiteral;
@@ -77,8 +73,6 @@ public final class LookupJoinUtil {
         public final LogicalType sourceType;
 
         @JsonProperty(FIELD_NAME_LITERAL)
-        @JsonSerialize(using = RexNodeJsonSerializer.class)
-        @JsonDeserialize(using = RexNodeJsonDeserializer.class)
         public final RexLiteral literal;
 
         @JsonCreator