You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/01/14 08:14:57 UTC

[flink] 01/02: [FLINK-25230][table-planner] Replace RelDataType with LogicalType serialization

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 75992f495192b84cfe406d7e24a1188c7cb284b0
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Jan 12 14:34:43 2022 +0100

    [FLINK-25230][table-planner] Replace RelDataType with LogicalType serialization
---
 .../exec/serde/AggregateCallJsonSerializer.java    |   2 +-
 .../exec/serde/RelDataTypeJsonDeserializer.java    | 173 +-----
 .../exec/serde/RelDataTypeJsonSerializer.java      | 146 +----
 .../nodes/exec/serde/RexNodeJsonSerializer.java    |  74 ++-
 .../exec/serde/RexWindowBoundJsonSerializer.java   |   3 +-
 .../planner/plan/schema/StructuredRelDataType.java |   2 +-
 .../typeutils/LogicalRelDataTypeConverter.java     | 649 +++++++++++++++++++++
 .../nodes/exec/serde/DataTypeJsonSerdeTest.java    |  51 +-
 .../serde/DynamicTableSourceSpecSerdeTest.java     |   3 +
 .../plan/nodes/exec/serde/JsonSerdeMocks.java      |  76 +++
 .../nodes/exec/serde/LogicalTypeJsonSerdeTest.java |  45 +-
 .../nodes/exec/serde/RelDataTypeJsonSerdeTest.java | 219 +++----
 .../plan/nodes/exec/serde/RexNodeSerdeTest.java    | 103 +---
 .../nodes/exec/serde/RexWindowBoundSerdeTest.java  |   3 +-
 .../serde/TemporalTableSourceSpecSerdeTest.java    |   8 +-
 .../typeutils/LogicalRelDataTypeConverterTest.java | 215 +++++++
 16 files changed, 1157 insertions(+), 615 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
index 92c85b4..27f2549 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AggregateCallJsonSerializer.java
@@ -83,7 +83,7 @@ public class AggregateCallJsonSerializer extends StdSerializer<AggregateCall> {
         jsonGenerator.writeBooleanField(FIELD_NAME_DISTINCT, aggCall.isDistinct());
         jsonGenerator.writeBooleanField(FIELD_NAME_APPROXIMATE, aggCall.isApproximate());
         jsonGenerator.writeBooleanField(FIELD_NAME_IGNORE_NULLS, aggCall.ignoreNulls());
-        jsonGenerator.writeObjectField(FIELD_NAME_TYPE, aggCall.getType());
+        serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, aggCall.getType(), jsonGenerator);
         jsonGenerator.writeEndObject();
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java
index 1476e41..6b35780 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonDeserializer.java
@@ -18,57 +18,26 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.TableException;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.typeutils.LogicalRelDataTypeConverter;
 import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RawType;
-import org.apache.flink.table.types.logical.StructuredType;
-import org.apache.flink.table.types.logical.TimestampKind;
-import org.apache.flink.table.types.logical.TypeInformationRawType;
-import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
-import org.apache.flink.table.utils.EncodingUtils;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.ObjectCodec;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
 
-import org.apache.calcite.avatica.util.TimeUnit;
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.StructKind;
-import org.apache.calcite.sql.SqlIntervalQualifier;
-import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.util.Util;
 
 import java.io.IOException;
 
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_ELEMENT;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_FIELDS;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_FILED_NAME;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_KEY;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_NULLABLE;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_PRECISION;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_RAW_TYPE;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_SCALE;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_STRUCTURED_TYPE;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_STRUCT_KIND;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_TIMESTAMP_KIND;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_TYPE_INFO;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_TYPE_NAME;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.RelDataTypeJsonSerializer.FIELD_NAME_VALUE;
-import static org.apache.flink.util.Preconditions.checkArgument;
-
 /**
- * JSON deserializer for {@link RelDataType}. refer to {@link RelDataTypeJsonSerializer} for
- * serializer.
+ * JSON deserializer for {@link RelDataType}.
+ *
+ * @see RelDataTypeJsonSerializer for the reverse operation
  */
+@Internal
 public class RelDataTypeJsonDeserializer extends StdDeserializer<RelDataType> {
     private static final long serialVersionUID = 1L;
 
@@ -79,129 +48,11 @@ public class RelDataTypeJsonDeserializer extends StdDeserializer<RelDataType> {
     @Override
     public RelDataType deserialize(JsonParser jsonParser, DeserializationContext ctx)
             throws IOException {
-        JsonNode jsonNode = jsonParser.readValueAsTree();
-        return deserialize(jsonNode, jsonParser.getCodec(), ctx);
-    }
-
-    private RelDataType deserialize(
-            JsonNode jsonNode, ObjectCodec codec, DeserializationContext ctx) throws IOException {
-        SerdeContext serdeContext = SerdeContext.get(ctx);
-        FlinkTypeFactory typeFactory = serdeContext.getTypeFactory();
-        if (jsonNode instanceof ObjectNode) {
-            ObjectNode objectNode = (ObjectNode) jsonNode;
-            if (objectNode.has(FIELD_NAME_TIMESTAMP_KIND)) {
-                boolean nullable = objectNode.get(FIELD_NAME_NULLABLE).booleanValue();
-                String typeName = objectNode.get(FIELD_NAME_TYPE_NAME).textValue();
-                boolean isTimestampLtz =
-                        SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE.name().equals(typeName);
-                TimestampKind timestampKind =
-                        TimestampKind.valueOf(
-                                objectNode.get(FIELD_NAME_TIMESTAMP_KIND).asText().toUpperCase());
-                switch (timestampKind) {
-                    case ROWTIME:
-                        return typeFactory.createRowtimeIndicatorType(nullable, isTimestampLtz);
-                    case PROCTIME:
-                        return typeFactory.createProctimeIndicatorType(nullable);
-                    default:
-                        throw new TableException(timestampKind + " is not supported.");
-                }
-            } else if (objectNode.has(FIELD_NAME_STRUCTURED_TYPE)) {
-                JsonNode structuredTypeNode = objectNode.get(FIELD_NAME_STRUCTURED_TYPE);
-                LogicalType structuredType =
-                        ctx.readValue(structuredTypeNode.traverse(codec), LogicalType.class);
-                checkArgument(structuredType instanceof StructuredType);
-                return serdeContext.getTypeFactory().createFieldTypeFromLogicalType(structuredType);
-            } else if (objectNode.has(FIELD_NAME_STRUCT_KIND)) {
-                ArrayNode arrayNode = (ArrayNode) objectNode.get(FIELD_NAME_FIELDS);
-                RelDataTypeFactory.Builder builder = typeFactory.builder();
-                for (JsonNode node : arrayNode) {
-                    builder.add(
-                            node.get(FIELD_NAME_FILED_NAME).asText(),
-                            deserialize(node, codec, ctx));
-                }
-                StructKind structKind =
-                        StructKind.valueOf(
-                                objectNode.get(FIELD_NAME_STRUCT_KIND).asText().toUpperCase());
-                boolean nullable = objectNode.get(FIELD_NAME_NULLABLE).booleanValue();
-                return builder.kind(structKind).nullableRecord(nullable).build();
-            } else if (objectNode.has(FIELD_NAME_FIELDS)) {
-                JsonNode fields = objectNode.get(FIELD_NAME_FIELDS);
-                // Nested struct
-                return deserialize(fields, codec, ctx);
-            } else {
-                SqlTypeName sqlTypeName =
-                        Util.enumVal(
-                                SqlTypeName.class, objectNode.get(FIELD_NAME_TYPE_NAME).asText());
-                boolean nullable = objectNode.get(FIELD_NAME_NULLABLE).booleanValue();
-                if (SqlTypeName.INTERVAL_TYPES.contains(sqlTypeName)) {
-                    TimeUnit startUnit = sqlTypeName.getStartUnit();
-                    TimeUnit endUnit = sqlTypeName.getEndUnit();
-                    return typeFactory.createTypeWithNullability(
-                            typeFactory.createSqlIntervalType(
-                                    new SqlIntervalQualifier(
-                                            startUnit, endUnit, SqlParserPos.ZERO)),
-                            nullable);
-                }
-                if (sqlTypeName == SqlTypeName.OTHER && objectNode.has(FIELD_NAME_RAW_TYPE)) {
-                    RawType<?> rawType =
-                            (RawType<?>)
-                                    LogicalTypeParser.parse(
-                                            objectNode.get(FIELD_NAME_RAW_TYPE).asText(),
-                                            serdeContext.getClassLoader());
-                    return typeFactory.createTypeWithNullability(
-                            typeFactory.createFieldTypeFromLogicalType(rawType), nullable);
-                }
-                if (sqlTypeName == SqlTypeName.ANY && objectNode.has(FIELD_NAME_RAW_TYPE)) {
-                    JsonNode rawTypeNode = objectNode.get(FIELD_NAME_RAW_TYPE);
-                    boolean nullableOfTypeInfo =
-                            rawTypeNode.get(FIELD_NAME_NULLABLE).booleanValue();
-                    TypeInformation<?> typeInfo =
-                            EncodingUtils.decodeStringToObject(
-                                    rawTypeNode.get(FIELD_NAME_TYPE_INFO).asText(),
-                                    TypeInformation.class,
-                                    serdeContext.getClassLoader());
-                    TypeInformationRawType<?> rawType =
-                            new TypeInformationRawType<>(nullableOfTypeInfo, typeInfo);
-                    return typeFactory.createTypeWithNullability(
-                            typeFactory.createFieldTypeFromLogicalType(rawType), nullable);
-                }
-
-                Integer precision =
-                        objectNode.has(FIELD_NAME_PRECISION)
-                                ? objectNode.get(FIELD_NAME_PRECISION).intValue()
-                                : null;
-                Integer scale =
-                        objectNode.has(FIELD_NAME_SCALE)
-                                ? objectNode.get(FIELD_NAME_SCALE).intValue()
-                                : null;
-                final RelDataType type;
-                if (sqlTypeName == SqlTypeName.ARRAY) {
-                    RelDataType elementType =
-                            deserialize(objectNode.get(FIELD_NAME_ELEMENT), codec, ctx);
-                    type = typeFactory.createArrayType(elementType, -1);
-                } else if (sqlTypeName == SqlTypeName.MULTISET) {
-                    RelDataType elementType =
-                            deserialize(objectNode.get(FIELD_NAME_ELEMENT), codec, ctx);
-                    type = typeFactory.createMultisetType(elementType, -1);
-                } else if (sqlTypeName == SqlTypeName.MAP) {
-                    RelDataType keyType = deserialize(objectNode.get(FIELD_NAME_KEY), codec, ctx);
-                    RelDataType valueType =
-                            deserialize(objectNode.get(FIELD_NAME_VALUE), codec, ctx);
-                    type = typeFactory.createMapType(keyType, valueType);
-                } else if (precision == null) {
-                    type = typeFactory.createSqlType(sqlTypeName);
-                } else if (scale == null) {
-                    type = typeFactory.createSqlType(sqlTypeName, precision);
-                } else {
-                    type = typeFactory.createSqlType(sqlTypeName, precision, scale);
-                }
-                return typeFactory.createTypeWithNullability(type, nullable);
-            }
-        } else if (jsonNode instanceof TextNode) {
-            SqlTypeName sqlTypeName = Util.enumVal(SqlTypeName.class, jsonNode.asText());
-            return typeFactory.createSqlType(sqlTypeName);
-        } else {
-            throw new TableException("Unknown type: " + jsonNode.toPrettyString());
-        }
+        final JsonNode logicalTypeNode = jsonParser.readValueAsTree();
+        final SerdeContext serdeContext = SerdeContext.get(ctx);
+        final FlinkTypeFactory typeFactory = serdeContext.getTypeFactory();
+        final LogicalType logicalType =
+                LogicalTypeJsonDeserializer.deserialize(logicalTypeNode, serdeContext);
+        return LogicalRelDataTypeConverter.toRelDataType(logicalType, typeFactory);
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java
index 01f500c..5d76abb 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerializer.java
@@ -18,49 +18,28 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
-import org.apache.flink.table.planner.plan.schema.GenericRelDataType;
-import org.apache.flink.table.planner.plan.schema.RawRelDataType;
-import org.apache.flink.table.planner.plan.schema.StructuredRelDataType;
-import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
-import org.apache.flink.table.types.logical.TimestampKind;
-import org.apache.flink.table.types.logical.TypeInformationRawType;
-import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.planner.typeutils.LogicalRelDataTypeConverter;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
 
 import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.sql.type.ArraySqlType;
-import org.apache.calcite.sql.type.MapSqlType;
-import org.apache.calcite.sql.type.MultisetSqlType;
-import org.apache.calcite.sql.type.SqlTypeName;
 
 import java.io.IOException;
 
 /**
- * JSON serializer for {@link RelDataType}. refer to {@link RelDataTypeJsonDeserializer} for
- * deserializer.
+ * JSON serializer for {@link RelDataType}.
+ *
+ * @see RelDataTypeJsonDeserializer for the reverse operation.
  */
-public class RelDataTypeJsonSerializer extends StdSerializer<RelDataType> {
+@Internal
+public final class RelDataTypeJsonSerializer extends StdSerializer<RelDataType> {
     private static final long serialVersionUID = 1L;
 
-    public static final String FIELD_NAME_TYPE_NAME = "typeName";
-    public static final String FIELD_NAME_FILED_NAME = "fieldName";
-    public static final String FIELD_NAME_NULLABLE = "nullable";
-    public static final String FIELD_NAME_PRECISION = "precision";
-    public static final String FIELD_NAME_SCALE = "scale";
-    public static final String FIELD_NAME_FIELDS = "fields";
-    public static final String FIELD_NAME_STRUCT_KIND = "structKind";
-    public static final String FIELD_NAME_TIMESTAMP_KIND = "timestampKind";
-    public static final String FIELD_NAME_ELEMENT = "element";
-    public static final String FIELD_NAME_KEY = "key";
-    public static final String FIELD_NAME_VALUE = "value";
-    public static final String FIELD_NAME_TYPE_INFO = "typeInfo";
-    public static final String FIELD_NAME_RAW_TYPE = "rawType";
-    public static final String FIELD_NAME_STRUCTURED_TYPE = "structuredType";
-
     public RelDataTypeJsonSerializer() {
         super(RelDataType.class);
     }
@@ -71,103 +50,14 @@ public class RelDataTypeJsonSerializer extends StdSerializer<RelDataType> {
             JsonGenerator jsonGenerator,
             SerializerProvider serializerProvider)
             throws IOException {
-        jsonGenerator.writeStartObject();
-        serializeInternal(relDataType, jsonGenerator, serializerProvider);
-        jsonGenerator.writeEndObject();
-    }
-
-    private void serializeInternal(
-            RelDataType relDataType, JsonGenerator gen, SerializerProvider serializerProvider)
-            throws IOException {
-        if (relDataType instanceof TimeIndicatorRelDataType) {
-            TimeIndicatorRelDataType timeIndicatorType = (TimeIndicatorRelDataType) relDataType;
-            gen.writeStringField(
-                    FIELD_NAME_TIMESTAMP_KIND,
-                    timeIndicatorType.isEventTime()
-                            ? TimestampKind.ROWTIME.name()
-                            : TimestampKind.PROCTIME.name());
-            gen.writeStringField(
-                    FIELD_NAME_TYPE_NAME, timeIndicatorType.originalType().getSqlTypeName().name());
-            gen.writeBooleanField(FIELD_NAME_NULLABLE, relDataType.isNullable());
-        } else if (relDataType instanceof StructuredRelDataType) {
-            StructuredRelDataType structuredType = (StructuredRelDataType) relDataType;
-            serializerProvider.defaultSerializeField(
-                    FIELD_NAME_STRUCTURED_TYPE, structuredType.getStructuredType(), gen);
-        } else if (relDataType.isStruct()) {
-            gen.writeStringField(FIELD_NAME_STRUCT_KIND, relDataType.getStructKind().name());
-            gen.writeBooleanField(FIELD_NAME_NULLABLE, relDataType.isNullable());
-
-            gen.writeFieldName(FIELD_NAME_FIELDS);
-            gen.writeStartArray();
-            for (RelDataTypeField field : relDataType.getFieldList()) {
-                gen.writeStartObject();
-                serializeInternal(field.getType(), gen, serializerProvider);
-                gen.writeStringField(FIELD_NAME_FILED_NAME, field.getName());
-                gen.writeEndObject();
-            }
-            gen.writeEndArray();
-        } else if (relDataType.getSqlTypeName() == SqlTypeName.ARRAY) {
-            serializeCommon(relDataType, gen);
-            ArraySqlType arraySqlType = (ArraySqlType) relDataType;
-
-            gen.writeFieldName(FIELD_NAME_ELEMENT);
-            gen.writeStartObject();
-            serializeInternal(arraySqlType.getComponentType(), gen, serializerProvider);
-            gen.writeEndObject();
-        } else if (relDataType.getSqlTypeName() == SqlTypeName.MULTISET) {
-            assert relDataType instanceof MultisetSqlType;
-            serializeCommon(relDataType, gen);
-            MultisetSqlType multisetSqlType = (MultisetSqlType) relDataType;
-
-            gen.writeFieldName(FIELD_NAME_ELEMENT);
-            gen.writeStartObject();
-            serializeInternal(multisetSqlType.getComponentType(), gen, serializerProvider);
-            gen.writeEndObject();
-        } else if (relDataType.getSqlTypeName() == SqlTypeName.MAP) {
-            assert relDataType instanceof MapSqlType;
-            serializeCommon(relDataType, gen);
-            MapSqlType mapSqlType = (MapSqlType) relDataType;
-
-            gen.writeFieldName(FIELD_NAME_KEY);
-            gen.writeStartObject();
-            serializeInternal(mapSqlType.getKeyType(), gen, serializerProvider);
-            gen.writeEndObject();
-
-            gen.writeFieldName(FIELD_NAME_VALUE);
-            gen.writeStartObject();
-            serializeInternal(mapSqlType.getValueType(), gen, serializerProvider);
-            gen.writeEndObject();
-        } else if (relDataType instanceof GenericRelDataType) {
-            assert relDataType.getSqlTypeName() == SqlTypeName.ANY;
-            serializeCommon(relDataType, gen);
-            TypeInformationRawType<?> rawType = ((GenericRelDataType) relDataType).genericType();
-
-            gen.writeFieldName(FIELD_NAME_RAW_TYPE);
-            gen.writeStartObject();
-            gen.writeBooleanField(FIELD_NAME_NULLABLE, rawType.isNullable());
-            gen.writeStringField(
-                    FIELD_NAME_TYPE_INFO,
-                    EncodingUtils.encodeObjectToString(rawType.getTypeInformation()));
-            gen.writeEndObject();
-        } else if (relDataType instanceof RawRelDataType) {
-            assert relDataType.getSqlTypeName() == SqlTypeName.OTHER;
-            serializeCommon(relDataType, gen);
-            RawRelDataType rawType = (RawRelDataType) relDataType;
-            gen.writeStringField(FIELD_NAME_RAW_TYPE, rawType.getRawType().asSerializableString());
-        } else {
-            serializeCommon(relDataType, gen);
-        }
-    }
-
-    private void serializeCommon(RelDataType relDataType, JsonGenerator gen) throws IOException {
-        final SqlTypeName typeName = relDataType.getSqlTypeName();
-        gen.writeStringField(FIELD_NAME_TYPE_NAME, typeName.name());
-        gen.writeBooleanField(FIELD_NAME_NULLABLE, relDataType.isNullable());
-        if (relDataType.getSqlTypeName().allowsPrec()) {
-            gen.writeNumberField(FIELD_NAME_PRECISION, relDataType.getPrecision());
-        }
-        if (relDataType.getSqlTypeName().allowsScale()) {
-            gen.writeNumberField(FIELD_NAME_SCALE, relDataType.getScale());
-        }
+        final SerdeContext serdeContext = SerdeContext.get(serializerProvider);
+        final DataTypeFactory dataTypeFactory =
+                serdeContext.getFlinkContext().getCatalogManager().getDataTypeFactory();
+        // Conversion to LogicalType also ensures that Calcite's type system is materialized
+        // so data types like DECIMAL will receive a concrete precision and scale (not unspecified
+        // anymore).
+        final LogicalType logicalType =
+                LogicalRelDataTypeConverter.toLogicalType(relDataType, dataTypeFactory);
+        serializerProvider.defaultSerializeValue(logicalType, jsonGenerator);
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
index f16e805..427e488 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerializer.java
@@ -106,52 +106,63 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
         switch (rexNode.getKind()) {
             case INPUT_REF:
             case TABLE_INPUT_REF:
-                serialize((RexInputRef) rexNode, jsonGenerator);
+                serialize((RexInputRef) rexNode, jsonGenerator, serializerProvider);
                 break;
             case LITERAL:
-                serialize((RexLiteral) rexNode, jsonGenerator);
+                serialize((RexLiteral) rexNode, jsonGenerator, serializerProvider);
                 break;
             case FIELD_ACCESS:
-                serialize((RexFieldAccess) rexNode, jsonGenerator);
+                serialize((RexFieldAccess) rexNode, jsonGenerator, serializerProvider);
                 break;
             case CORREL_VARIABLE:
-                serialize((RexCorrelVariable) rexNode, jsonGenerator);
+                serialize((RexCorrelVariable) rexNode, jsonGenerator, serializerProvider);
                 break;
             case PATTERN_INPUT_REF:
-                serialize((RexPatternFieldRef) rexNode, jsonGenerator);
+                serialize((RexPatternFieldRef) rexNode, jsonGenerator, serializerProvider);
                 break;
             default:
                 if (rexNode instanceof RexCall) {
-                    serialize((RexCall) rexNode, jsonGenerator);
+                    serialize((RexCall) rexNode, jsonGenerator, serializerProvider);
                 } else {
                     throw new TableException("Unknown RexNode: " + rexNode);
                 }
         }
     }
 
-    private void serialize(RexPatternFieldRef inputRef, JsonGenerator gen) throws IOException {
+    private void serialize(
+            RexPatternFieldRef inputRef, JsonGenerator gen, SerializerProvider serializerProvider)
+            throws IOException {
         gen.writeStartObject();
         gen.writeStringField(FIELD_NAME_KIND, SQL_KIND_PATTERN_INPUT_REF);
         gen.writeStringField(FIELD_NAME_ALPHA, inputRef.getAlpha());
         gen.writeNumberField(FIELD_NAME_INPUT_INDEX, inputRef.getIndex());
-        gen.writeObjectField(FIELD_NAME_TYPE, inputRef.getType());
+        serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, inputRef.getType(), gen);
         gen.writeEndObject();
     }
 
-    private void serialize(RexInputRef inputRef, JsonGenerator gen) throws IOException {
+    private void serialize(
+            RexInputRef inputRef, JsonGenerator gen, SerializerProvider serializerProvider)
+            throws IOException {
         gen.writeStartObject();
         gen.writeStringField(FIELD_NAME_KIND, SQL_KIND_INPUT_REF);
         gen.writeNumberField(FIELD_NAME_INPUT_INDEX, inputRef.getIndex());
-        gen.writeObjectField(FIELD_NAME_TYPE, inputRef.getType());
+        serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, inputRef.getType(), gen);
         gen.writeEndObject();
     }
 
-    private void serialize(RexLiteral literal, JsonGenerator gen) throws IOException {
+    private void serialize(
+            RexLiteral literal, JsonGenerator gen, SerializerProvider serializerProvider)
+            throws IOException {
         gen.writeStartObject();
         gen.writeStringField(FIELD_NAME_KIND, SQL_KIND_LITERAL);
         Comparable<?> value = literal.getValueAs(Comparable.class);
-        serialize(value, literal.getTypeName(), literal.getType().getSqlTypeName(), gen);
-        gen.writeObjectField(FIELD_NAME_TYPE, literal.getType());
+        serialize(
+                value,
+                literal.getTypeName(),
+                literal.getType().getSqlTypeName(),
+                gen,
+                serializerProvider);
+        serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, literal.getType(), gen);
         gen.writeEndObject();
     }
 
@@ -160,7 +171,8 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
             Comparable<?> value,
             SqlTypeName literalTypeName,
             SqlTypeName elementTypeName,
-            JsonGenerator gen)
+            JsonGenerator gen,
+            SerializerProvider serializerProvider)
             throws IOException {
         if (value == null) {
             gen.writeNullField(FIELD_NAME_VALUE);
@@ -229,14 +241,14 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
                 gen.writeStringField(FIELD_NAME_CLASS, value.getClass().getName());
                 break;
             case SARG:
-                serialize((Sarg<?>) value, elementTypeName, gen);
+                serialize((Sarg<?>) value, elementTypeName, gen, serializerProvider);
                 break;
             case ROW:
             case MULTISET:
                 gen.writeFieldName(FIELD_NAME_VALUE);
                 gen.writeStartArray();
                 for (RexLiteral v : (FlatLists.ComparableList<RexLiteral>) value) {
-                    serialize(v, gen);
+                    serialize(v, gen, serializerProvider);
                 }
                 gen.writeEndArray();
                 break;
@@ -247,7 +259,11 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
     }
 
     @SuppressWarnings("UnstableApiUsage")
-    private void serialize(Sarg<?> value, SqlTypeName sqlTypeName, JsonGenerator gen)
+    private void serialize(
+            Sarg<?> value,
+            SqlTypeName sqlTypeName,
+            JsonGenerator gen,
+            SerializerProvider serializerProvider)
             throws IOException {
         gen.writeFieldName(FIELD_NAME_SARG);
         gen.writeStartObject();
@@ -258,14 +274,14 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
             if (range.hasLowerBound()) {
                 gen.writeFieldName(FIELD_NAME_BOUND_LOWER);
                 gen.writeStartObject();
-                serialize(range.lowerEndpoint(), sqlTypeName, sqlTypeName, gen);
+                serialize(range.lowerEndpoint(), sqlTypeName, sqlTypeName, gen, serializerProvider);
                 gen.writeStringField(FIELD_NAME_BOUND_TYPE, range.lowerBoundType().name());
                 gen.writeEndObject();
             }
             if (range.hasUpperBound()) {
                 gen.writeFieldName(FIELD_NAME_BOUND_UPPER);
                 gen.writeStartObject();
-                serialize(range.upperEndpoint(), sqlTypeName, sqlTypeName, gen);
+                serialize(range.upperEndpoint(), sqlTypeName, sqlTypeName, gen, serializerProvider);
                 gen.writeStringField(FIELD_NAME_BOUND_TYPE, range.upperBoundType().name());
                 gen.writeEndObject();
             }
@@ -276,23 +292,29 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
         gen.writeEndObject();
     }
 
-    private void serialize(RexFieldAccess fieldAccess, JsonGenerator gen) throws IOException {
+    private void serialize(
+            RexFieldAccess fieldAccess, JsonGenerator gen, SerializerProvider serializerProvider)
+            throws IOException {
         gen.writeStartObject();
         gen.writeStringField(FIELD_NAME_KIND, SQL_KIND_FIELD_ACCESS);
         gen.writeStringField(FIELD_NAME_NAME, fieldAccess.getField().getName());
-        gen.writeObjectField(FIELD_NAME_EXPR, fieldAccess.getReferenceExpr());
+        serializerProvider.defaultSerializeField(
+                FIELD_NAME_EXPR, fieldAccess.getReferenceExpr(), gen);
         gen.writeEndObject();
     }
 
-    private void serialize(RexCorrelVariable variable, JsonGenerator gen) throws IOException {
+    private void serialize(
+            RexCorrelVariable variable, JsonGenerator gen, SerializerProvider serializerProvider)
+            throws IOException {
         gen.writeStartObject();
         gen.writeStringField(FIELD_NAME_KIND, SQL_KIND_CORREL_VARIABLE);
         gen.writeStringField(FIELD_NAME_CORREL, variable.getName());
-        gen.writeObjectField(FIELD_NAME_TYPE, variable.getType());
+        serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, variable.getType(), gen);
         gen.writeEndObject();
     }
 
-    private void serialize(RexCall call, JsonGenerator gen) throws IOException {
+    private void serialize(RexCall call, JsonGenerator gen, SerializerProvider serializerProvider)
+            throws IOException {
         if (!call.getClass().isAssignableFrom(RexCall.class)) {
             throw new TableException("Unknown RexCall: " + call);
         }
@@ -302,10 +324,10 @@ public class RexNodeJsonSerializer extends StdSerializer<RexNode> {
         gen.writeFieldName(FIELD_NAME_OPERANDS);
         gen.writeStartArray();
         for (RexNode operand : call.getOperands()) {
-            gen.writeObject(operand);
+            serializerProvider.defaultSerializeValue(operand, gen);
         }
         gen.writeEndArray();
-        gen.writeObjectField(FIELD_NAME_TYPE, call.getType());
+        serializerProvider.defaultSerializeField(FIELD_NAME_TYPE, call.getType(), gen);
         gen.writeEndObject();
     }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java
index ac6dcc2..021731c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundJsonSerializer.java
@@ -75,7 +75,8 @@ public class RexWindowBoundJsonSerializer extends StdSerializer<RexWindowBound>
             } else {
                 throw new TableException("Unknown RexWindowBound: " + rexWindowBound);
             }
-            gen.writeObjectField(FIELD_NAME_OFFSET, rexWindowBound.getOffset());
+            serializerProvider.defaultSerializeField(
+                    FIELD_NAME_OFFSET, rexWindowBound.getOffset(), gen);
         }
         gen.writeEndObject();
     }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java
index 78b5ec3..3021040 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/schema/StructuredRelDataType.java
@@ -54,7 +54,7 @@ public final class StructuredRelDataType extends ObjectSqlType {
 
     private final StructuredType structuredType;
 
-    private StructuredRelDataType(StructuredType structuredType, List<RelDataTypeField> fields) {
+    public StructuredRelDataType(StructuredType structuredType, List<RelDataTypeField> fields) {
         super(
                 SqlTypeName.STRUCTURED,
                 createSqlIdentifier(structuredType),
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverter.java
new file mode 100644
index 0000000..5645031
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverter.java
@@ -0,0 +1,649 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.typeutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.RawRelDataType;
+import org.apache.flink.table.planner.plan.schema.StructuredRelDataType;
+import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType.DayTimeResolution;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType.YearMonthResolution;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rel.type.StructKind;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.BasicSqlType;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Symmetric converter between {@link LogicalType} and {@link RelDataType}.
+ *
+ * <p>This converter has many similarities with {@link FlinkTypeFactory} and might also replace it
+ * at some point. However, for now it is more consistent and does not lose information (i.e. for
+ * TIME(9) or interval types). It still delegates to {@link RelDataTypeFactory} but only for
+ * predefined/basic types.
+ *
+ * <p>Note: The conversion to {@link LogicalType} is not 100% symmetric and is currently optimized
+ * for expressions. Information about the {@link StructKind} of a {@link RelRecordType} is always
+ * set to {@link StructKind#PEEK_FIELDS_NO_EXPAND}. Missing precision and scale will be filled with
+ * Flink's default values such that all resulting {@link LogicalType}s will be fully resolved.
+ */
+@Internal
+public final class LogicalRelDataTypeConverter {
+
+    public static RelDataType toRelDataType(
+            LogicalType logicalType, RelDataTypeFactory relDataTypeFactory) {
+        final LogicalToRelDataTypeConverter converter =
+                new LogicalToRelDataTypeConverter(relDataTypeFactory);
+        final RelDataType relDataType = logicalType.accept(converter);
+        // this also canonizes in the factory (see SqlTypeFactoryImpl.canonize)
+        return relDataTypeFactory.createTypeWithNullability(relDataType, logicalType.isNullable());
+    }
+
+    public static LogicalType toLogicalType(
+            RelDataType relDataType, DataTypeFactory dataTypeFactory) {
+        final LogicalType logicalType = toLogicalTypeNotNull(relDataType, dataTypeFactory);
+        return logicalType.copy(relDataType.isNullable());
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // LogicalType to RelDataType
+    // --------------------------------------------------------------------------------------------
+
+    private static class LogicalToRelDataTypeConverter implements LogicalTypeVisitor<RelDataType> {
+
+        private final RelDataTypeFactory relDataTypeFactory;
+
+        LogicalToRelDataTypeConverter(RelDataTypeFactory relDataTypeFactory) {
+            this.relDataTypeFactory = relDataTypeFactory;
+        }
+
+        @Override
+        public RelDataType visit(CharType charType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.CHAR, charType.getLength());
+        }
+
+        @Override
+        public RelDataType visit(VarCharType varCharType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.VARCHAR, varCharType.getLength());
+        }
+
+        @Override
+        public RelDataType visit(BooleanType booleanType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.BOOLEAN);
+        }
+
+        @Override
+        public RelDataType visit(BinaryType binaryType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.BINARY, binaryType.getLength());
+        }
+
+        @Override
+        public RelDataType visit(VarBinaryType varBinaryType) {
+            return relDataTypeFactory.createSqlType(
+                    SqlTypeName.VARBINARY, varBinaryType.getLength());
+        }
+
+        @Override
+        public RelDataType visit(DecimalType decimalType) {
+            return relDataTypeFactory.createSqlType(
+                    SqlTypeName.DECIMAL, decimalType.getPrecision(), decimalType.getScale());
+        }
+
+        @Override
+        public RelDataType visit(TinyIntType tinyIntType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.TINYINT);
+        }
+
+        @Override
+        public RelDataType visit(SmallIntType smallIntType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.SMALLINT);
+        }
+
+        @Override
+        public RelDataType visit(IntType intType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.INTEGER);
+        }
+
+        @Override
+        public RelDataType visit(BigIntType bigIntType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.BIGINT);
+        }
+
+        @Override
+        public RelDataType visit(FloatType floatType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.FLOAT);
+        }
+
+        @Override
+        public RelDataType visit(DoubleType doubleType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.DOUBLE);
+        }
+
+        @Override
+        public RelDataType visit(DateType dateType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.DATE);
+        }
+
+        @Override
+        public RelDataType visit(TimeType timeType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.TIME, timeType.getPrecision());
+        }
+
+        @Override
+        public RelDataType visit(TimestampType timestampType) {
+            final RelDataType timestampRelDataType =
+                    relDataTypeFactory.createSqlType(
+                            SqlTypeName.TIMESTAMP, timestampType.getPrecision());
+            switch (timestampType.getKind()) {
+                case REGULAR:
+                    return timestampRelDataType;
+                case ROWTIME:
+                    assert timestampType.getPrecision() == 3;
+                    return new TimeIndicatorRelDataType(
+                            relDataTypeFactory.getTypeSystem(),
+                            (BasicSqlType) timestampRelDataType,
+                            timestampType.isNullable(),
+                            true);
+                default:
+                    throw new TableException("Unknown timestamp kind.");
+            }
+        }
+
+        @Override
+        public RelDataType visit(ZonedTimestampType zonedTimestampType) {
+            throw new TableException("TIMESTAMP WITH TIME ZONE is currently not supported.");
+        }
+
+        @Override
+        public RelDataType visit(LocalZonedTimestampType localZonedTimestampType) {
+            final RelDataType timestampRelDataType =
+                    relDataTypeFactory.createSqlType(
+                            SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+                            localZonedTimestampType.getPrecision());
+            switch (localZonedTimestampType.getKind()) {
+                case REGULAR:
+                    return timestampRelDataType;
+                case ROWTIME:
+                    assert localZonedTimestampType.getPrecision() == 3;
+                    return new TimeIndicatorRelDataType(
+                            relDataTypeFactory.getTypeSystem(),
+                            (BasicSqlType) timestampRelDataType,
+                            localZonedTimestampType.isNullable(),
+                            true);
+                case PROCTIME:
+                    assert localZonedTimestampType.getPrecision() == 3;
+                    return new TimeIndicatorRelDataType(
+                            relDataTypeFactory.getTypeSystem(),
+                            (BasicSqlType) timestampRelDataType,
+                            localZonedTimestampType.isNullable(),
+                            false);
+                default:
+                    throw new TableException("Unknown timestamp kind.");
+            }
+        }
+
+        @Override
+        public RelDataType visit(YearMonthIntervalType yearMonthIntervalType) {
+            final int yearPrecision = yearMonthIntervalType.getYearPrecision();
+            final SqlIntervalQualifier intervalQualifier;
+            switch (yearMonthIntervalType.getResolution()) {
+                case YEAR:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.YEAR,
+                                    yearPrecision,
+                                    TimeUnit.YEAR,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    SqlParserPos.ZERO);
+                    break;
+                case YEAR_TO_MONTH:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.YEAR,
+                                    yearPrecision,
+                                    TimeUnit.MONTH,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    SqlParserPos.ZERO);
+                    break;
+                case MONTH:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.MONTH,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    TimeUnit.MONTH,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    SqlParserPos.ZERO);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown interval resolution.");
+            }
+            return relDataTypeFactory.createSqlIntervalType(intervalQualifier);
+        }
+
+        @Override
+        public RelDataType visit(DayTimeIntervalType dayTimeIntervalType) {
+            final int dayPrecision = dayTimeIntervalType.getDayPrecision();
+            final int fractionalPrecision = dayTimeIntervalType.getFractionalPrecision();
+            final SqlIntervalQualifier intervalQualifier;
+            switch (dayTimeIntervalType.getResolution()) {
+                case DAY:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.DAY,
+                                    dayPrecision,
+                                    TimeUnit.DAY,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    SqlParserPos.ZERO);
+                    break;
+                case DAY_TO_HOUR:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.DAY,
+                                    dayPrecision,
+                                    TimeUnit.HOUR,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    SqlParserPos.ZERO);
+                    break;
+                case DAY_TO_MINUTE:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.DAY,
+                                    dayPrecision,
+                                    TimeUnit.MINUTE,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    SqlParserPos.ZERO);
+                    break;
+                case DAY_TO_SECOND:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.DAY,
+                                    dayPrecision,
+                                    TimeUnit.SECOND,
+                                    fractionalPrecision,
+                                    SqlParserPos.ZERO);
+                    break;
+                case HOUR:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.HOUR,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    TimeUnit.HOUR,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    SqlParserPos.ZERO);
+                    break;
+                case HOUR_TO_MINUTE:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.HOUR,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    TimeUnit.MINUTE,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    SqlParserPos.ZERO);
+                    break;
+                case HOUR_TO_SECOND:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.HOUR,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    TimeUnit.SECOND,
+                                    fractionalPrecision,
+                                    SqlParserPos.ZERO);
+                    break;
+                case MINUTE:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.MINUTE,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    TimeUnit.MINUTE,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    SqlParserPos.ZERO);
+                    break;
+                case MINUTE_TO_SECOND:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.MINUTE,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    TimeUnit.SECOND,
+                                    fractionalPrecision,
+                                    SqlParserPos.ZERO);
+                    break;
+                case SECOND:
+                    intervalQualifier =
+                            new SqlIntervalQualifier(
+                                    TimeUnit.SECOND,
+                                    RelDataType.PRECISION_NOT_SPECIFIED,
+                                    TimeUnit.SECOND,
+                                    fractionalPrecision,
+                                    SqlParserPos.ZERO);
+                    break;
+                default:
+                    throw new TableException("Unknown interval resolution.");
+            }
+            return relDataTypeFactory.createSqlIntervalType(intervalQualifier);
+        }
+
+        @Override
+        public RelDataType visit(ArrayType arrayType) {
+            final RelDataType elementRelDataType =
+                    toRelDataType(arrayType.getElementType(), relDataTypeFactory);
+            return relDataTypeFactory.createArrayType(elementRelDataType, -1);
+        }
+
+        @Override
+        public RelDataType visit(MultisetType multisetType) {
+            final RelDataType elementRelDataType =
+                    toRelDataType(multisetType.getElementType(), relDataTypeFactory);
+            return relDataTypeFactory.createMultisetType(elementRelDataType, -1);
+        }
+
+        @Override
+        public RelDataType visit(MapType mapType) {
+            final RelDataType keyRelDataType =
+                    toRelDataType(mapType.getKeyType(), relDataTypeFactory);
+            final RelDataType valueRelDataType =
+                    toRelDataType(mapType.getValueType(), relDataTypeFactory);
+            return relDataTypeFactory.createMapType(keyRelDataType, valueRelDataType);
+        }
+
+        @Override
+        public RelDataType visit(RowType rowType) {
+            return relDataTypeFactory.createStructType(
+                    StructKind.PEEK_FIELDS_NO_EXPAND,
+                    rowType.getFields().stream()
+                            .map(f -> toRelDataType(f.getType(), relDataTypeFactory))
+                            .collect(Collectors.toList()),
+                    rowType.getFieldNames());
+        }
+
+        @Override
+        public RelDataType visit(DistinctType distinctType) {
+            throw new TableException("DISTINCT type is currently not supported.");
+        }
+
+        @Override
+        public RelDataType visit(StructuredType structuredType) {
+            final List<RelDataTypeField> fields = new ArrayList<>();
+            for (int i = 0; i < structuredType.getAttributes().size(); i++) {
+                final StructuredType.StructuredAttribute attribute =
+                        structuredType.getAttributes().get(i);
+                final RelDataTypeField field =
+                        new RelDataTypeFieldImpl(
+                                attribute.getName(),
+                                i,
+                                toRelDataType(attribute.getType(), relDataTypeFactory));
+                fields.add(field);
+            }
+            return new StructuredRelDataType(structuredType, fields);
+        }
+
+        @Override
+        public RelDataType visit(NullType nullType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.NULL);
+        }
+
+        @Override
+        public RelDataType visit(RawType<?> rawType) {
+            return new RawRelDataType(rawType);
+        }
+
+        @Override
+        public RelDataType visit(SymbolType<?> symbolType) {
+            return relDataTypeFactory.createSqlType(SqlTypeName.SYMBOL);
+        }
+
+        @Override
+        public RelDataType visit(LogicalType other) {
+            throw new TableException(
+                    String.format(
+                            "Logical type '%s' cannot be converted to a RelDataType.", other));
+        }
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // RelDataType to LogicalType
+    // --------------------------------------------------------------------------------------------
+
+    private static LogicalType toLogicalTypeNotNull(
+            RelDataType relDataType, DataTypeFactory dataTypeFactory) {
+        // dataTypeFactory is a preparation for catalog user-defined types
+        switch (relDataType.getSqlTypeName()) {
+            case BOOLEAN:
+                return new BooleanType(false);
+            case TINYINT:
+                return new TinyIntType(false);
+            case SMALLINT:
+                return new SmallIntType(false);
+            case INTEGER:
+                return new IntType(false);
+            case BIGINT:
+                return new BigIntType(false);
+            case DECIMAL:
+                if (relDataType.getScale() < 0) {
+                    // negative scale is not supported, normalize it
+                    return new DecimalType(
+                            false, relDataType.getPrecision() - relDataType.getScale(), 0);
+                }
+                return new DecimalType(false, relDataType.getPrecision(), relDataType.getScale());
+            case FLOAT:
+                return new FloatType(false);
+            case DOUBLE:
+                return new DoubleType(false);
+            case DATE:
+                return new DateType(false);
+            case TIME:
+                return new TimeType(false, relDataType.getPrecision());
+            case TIMESTAMP:
+                return new TimestampType(
+                        false, getTimestampKind(relDataType), relDataType.getPrecision());
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return new LocalZonedTimestampType(
+                        false, getTimestampKind(relDataType), relDataType.getPrecision());
+            case INTERVAL_YEAR:
+            case INTERVAL_YEAR_MONTH:
+            case INTERVAL_MONTH:
+                return new YearMonthIntervalType(
+                        false, getYearMonthResolution(relDataType), relDataType.getPrecision());
+            case INTERVAL_DAY:
+            case INTERVAL_DAY_HOUR:
+            case INTERVAL_DAY_MINUTE:
+            case INTERVAL_DAY_SECOND:
+            case INTERVAL_HOUR:
+            case INTERVAL_HOUR_MINUTE:
+            case INTERVAL_HOUR_SECOND:
+            case INTERVAL_MINUTE:
+            case INTERVAL_MINUTE_SECOND:
+                return new DayTimeIntervalType(
+                        false,
+                        getDayTimeResolution(relDataType),
+                        relDataType.getPrecision(),
+                        relDataType.getScale());
+            case INTERVAL_SECOND:
+                return new DayTimeIntervalType(
+                        false,
+                        getDayTimeResolution(relDataType),
+                        DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+                        relDataType.getScale());
+            case CHAR:
+                if (relDataType.getPrecision() == 0) {
+                    return CharType.ofEmptyLiteral();
+                }
+                return new CharType(false, relDataType.getPrecision());
+            case VARCHAR:
+                if (relDataType.getPrecision() == 0) {
+                    return VarCharType.ofEmptyLiteral();
+                }
+                return new VarCharType(false, relDataType.getPrecision());
+            case BINARY:
+                if (relDataType.getPrecision() == 0) {
+                    return BinaryType.ofEmptyLiteral();
+                }
+                return new BinaryType(false, relDataType.getPrecision());
+            case VARBINARY:
+                if (relDataType.getPrecision() == 0) {
+                    return VarBinaryType.ofEmptyLiteral();
+                }
+                return new VarBinaryType(false, relDataType.getPrecision());
+            case NULL:
+                return new NullType();
+            case SYMBOL:
+                return new SymbolType<>(false);
+            case MULTISET:
+                return new MultisetType(
+                        false, toLogicalType(relDataType.getComponentType(), dataTypeFactory));
+            case ARRAY:
+                return new ArrayType(
+                        false, toLogicalType(relDataType.getComponentType(), dataTypeFactory));
+            case MAP:
+                return new MapType(
+                        false,
+                        toLogicalType(relDataType.getKeyType(), dataTypeFactory),
+                        toLogicalType(relDataType.getValueType(), dataTypeFactory));
+            case DISTINCT:
+                throw new TableException("DISTINCT type is currently not supported.");
+            case ROW:
+                return new RowType(
+                        false,
+                        relDataType.getFieldList().stream()
+                                .map(
+                                        f ->
+                                                new RowField(
+                                                        f.getName(),
+                                                        toLogicalType(
+                                                                f.getType(), dataTypeFactory)))
+                                .collect(Collectors.toList()));
+            case STRUCTURED:
+            case OTHER:
+                if (relDataType instanceof StructuredRelDataType) {
+                    return ((StructuredRelDataType) relDataType).getStructuredType();
+                } else if (relDataType instanceof RawRelDataType) {
+                    return ((RawRelDataType) relDataType).getRawType();
+                }
+                // fall through
+            case REAL:
+            case TIME_WITH_LOCAL_TIME_ZONE:
+            case ANY:
+            case CURSOR:
+            case COLUMN_LIST:
+            case DYNAMIC_STAR:
+            case GEOMETRY:
+            case SARG:
+            default:
+                throw new TableException("Unsupported RelDataType: " + relDataType);
+        }
+    }
+
+    private static TimestampKind getTimestampKind(RelDataType relDataType) {
+        if (relDataType instanceof TimeIndicatorRelDataType) {
+            final TimeIndicatorRelDataType timeIndicator = (TimeIndicatorRelDataType) relDataType;
+            if (timeIndicator.isEventTime()) {
+                return TimestampKind.ROWTIME;
+            } else {
+                return TimestampKind.PROCTIME;
+            }
+        } else {
+            return TimestampKind.REGULAR;
+        }
+    }
+
+    private static YearMonthResolution getYearMonthResolution(RelDataType relDataType) {
+        switch (relDataType.getSqlTypeName()) {
+            case INTERVAL_YEAR:
+                return YearMonthResolution.YEAR;
+            case INTERVAL_YEAR_MONTH:
+                return YearMonthResolution.YEAR_TO_MONTH;
+            case INTERVAL_MONTH:
+                return YearMonthResolution.MONTH;
+            default:
+                throw new TableException("Unsupported YearMonthResolution.");
+        }
+    }
+
+    private static DayTimeResolution getDayTimeResolution(RelDataType relDataType) {
+        switch (relDataType.getSqlTypeName()) {
+            case INTERVAL_DAY:
+                return DayTimeResolution.DAY;
+            case INTERVAL_DAY_HOUR:
+                return DayTimeResolution.DAY_TO_HOUR;
+            case INTERVAL_DAY_MINUTE:
+                return DayTimeResolution.DAY_TO_MINUTE;
+            case INTERVAL_DAY_SECOND:
+                return DayTimeResolution.DAY_TO_SECOND;
+            case INTERVAL_HOUR:
+                return DayTimeResolution.HOUR;
+            case INTERVAL_HOUR_MINUTE:
+                return DayTimeResolution.HOUR_TO_MINUTE;
+            case INTERVAL_HOUR_SECOND:
+                return DayTimeResolution.HOUR_TO_SECOND;
+            case INTERVAL_MINUTE:
+                return DayTimeResolution.MINUTE;
+            case INTERVAL_MINUTE_SECOND:
+                return DayTimeResolution.MINUTE_TO_SECOND;
+            case INTERVAL_SECOND:
+                return DayTimeResolution.SECOND;
+            default:
+                throw new TableException("Unsupported DayTimeResolution.");
+        }
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerdeTest.java
index e2a335e..a7b176a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerdeTest.java
@@ -19,18 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.module.ModuleManager;
-import org.apache.flink.table.planner.calcite.FlinkContextImpl;
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.utils.CatalogManagerMocks;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
 
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -38,6 +27,9 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.io.IOException;
 import java.util.stream.Stream;
 
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.configuredSerdeContext;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toJson;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toObject;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link DataType} serialization and deserialization. */
@@ -48,7 +40,7 @@ public class DataTypeJsonSerdeTest {
     public void testDataTypeSerde(DataType dataType) throws IOException {
         final SerdeContext serdeContext = configuredSerdeContext();
         final String json = toJson(serdeContext, dataType);
-        final DataType actual = toDataType(serdeContext, json);
+        final DataType actual = toObject(serdeContext, json, DataType.class);
 
         assertThat(actual).isEqualTo(dataType);
     }
@@ -83,41 +75,6 @@ public class DataTypeJsonSerdeTest {
     // Shared utilities
     // --------------------------------------------------------------------------------------------
 
-    static SerdeContext configuredSerdeContext() {
-        return configuredSerdeContext(
-                CatalogManagerMocks.createEmptyCatalogManager(), TableConfig.getDefault());
-    }
-
-    static SerdeContext configuredSerdeContext(
-            CatalogManager catalogManager, TableConfig tableConfig) {
-        return new SerdeContext(
-                new FlinkContextImpl(
-                        false, tableConfig, new ModuleManager(), null, catalogManager, null),
-                Thread.currentThread().getContextClassLoader(),
-                FlinkTypeFactory.INSTANCE(),
-                FlinkSqlOperatorTable.instance());
-    }
-
-    static String toJson(SerdeContext serdeContext, DataType dataType) {
-        final ObjectWriter objectWriter = JsonSerdeUtil.createObjectWriter(serdeContext);
-        final String json;
-        try {
-            json = objectWriter.writeValueAsString(dataType);
-        } catch (JsonProcessingException e) {
-            throw new AssertionError(e);
-        }
-        return json;
-    }
-
-    static DataType toDataType(SerdeContext serdeContext, String json) {
-        final ObjectReader objectReader = JsonSerdeUtil.createObjectReader(serdeContext);
-        try {
-            return objectReader.readValue(json, DataType.class);
-        } catch (IOException e) {
-            throw new AssertionError(e);
-        }
-    }
-
     /** Testing class. */
     public static class PojoClass {
         public int f0;
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
index 7d095f0..1fb5b43 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
@@ -53,6 +53,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectRea
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
 
 import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.sql.SqlIntervalQualifier;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
@@ -221,7 +222,9 @@ public class DynamicTableSourceSpecSerdeTest {
                                                         BigDecimal.valueOf(1000),
                                                         new SqlIntervalQualifier(
                                                                 TimeUnit.SECOND,
+                                                                RelDataType.PRECISION_NOT_SPECIFIED,
                                                                 TimeUnit.SECOND,
+                                                                3,
                                                                 SqlParserPos.ZERO))),
                                         5000,
                                         RowType.of(
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeMocks.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeMocks.java
new file mode 100644
index 0000000..f50dd9a
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeMocks.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.planner.calcite.FlinkContextImpl;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
+
+import java.io.IOException;
+
+/** Mocks and utilities for serde tests. */
+final class JsonSerdeMocks {
+
+    private JsonSerdeMocks() {
+        // no instantiation
+    }
+
+    static SerdeContext configuredSerdeContext() {
+        return configuredSerdeContext(
+                CatalogManagerMocks.createEmptyCatalogManager(), TableConfig.getDefault());
+    }
+
+    static SerdeContext configuredSerdeContext(
+            CatalogManager catalogManager, TableConfig tableConfig) {
+        return new SerdeContext(
+                new FlinkContextImpl(
+                        false, tableConfig, new ModuleManager(), null, catalogManager, null),
+                Thread.currentThread().getContextClassLoader(),
+                FlinkTypeFactory.INSTANCE(),
+                FlinkSqlOperatorTable.instance());
+    }
+
+    static String toJson(SerdeContext serdeContext, Object object) {
+        final ObjectWriter objectWriter = JsonSerdeUtil.createObjectWriter(serdeContext);
+        final String json;
+        try {
+            json = objectWriter.writeValueAsString(object);
+        } catch (JsonProcessingException e) {
+            throw new AssertionError(e);
+        }
+        return json;
+    }
+
+    static <T> T toObject(SerdeContext serdeContext, String json, Class<T> clazz) {
+        final ObjectReader objectReader = JsonSerdeUtil.createObjectReader(serdeContext);
+        try {
+            return objectReader.readValue(json, clazz);
+        } catch (IOException e) {
+            throw new AssertionError(e);
+        }
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
index d96dcb5..b8ff8a5 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
@@ -69,8 +69,6 @@ import org.apache.flink.types.Row;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
 
 import org.junit.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -88,7 +86,9 @@ import java.util.Optional;
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
 import static org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation.ALL;
 import static org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation.IDENTIFIER;
-import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerdeTest.configuredSerdeContext;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.configuredSerdeContext;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toJson;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toObject;
 import static org.apache.flink.table.utils.CatalogManagerMocks.preparedCatalogManager;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -102,7 +102,7 @@ public class LogicalTypeJsonSerdeTest {
         final SerdeContext serdeContext = configuredSerdeContext();
 
         final String json = toJson(serdeContext, logicalType);
-        final LogicalType actual = toLogicalType(serdeContext, json);
+        final LogicalType actual = toObject(serdeContext, json, LogicalType.class);
 
         assertThat(actual).isEqualTo(logicalType);
     }
@@ -126,7 +126,7 @@ public class LogicalTypeJsonSerdeTest {
                 TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
                 TableConfigOptions.CatalogPlanRestore.IDENTIFIER);
         dataTypeFactoryMock.logicalType = Optional.empty();
-        assertThatThrownBy(() -> toLogicalType(serdeContext, minimalJson))
+        assertThatThrownBy(() -> toObject(serdeContext, minimalJson, LogicalType.class))
                 .satisfies(anyCauseMatches(ValidationException.class, "No type found."));
 
         // catalog lookup
@@ -134,7 +134,8 @@ public class LogicalTypeJsonSerdeTest {
                 TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
                 TableConfigOptions.CatalogPlanRestore.IDENTIFIER);
         dataTypeFactoryMock.logicalType = Optional.of(STRUCTURED_TYPE);
-        assertThat(toLogicalType(serdeContext, minimalJson)).isEqualTo(STRUCTURED_TYPE);
+        assertThat(toObject(serdeContext, minimalJson, LogicalType.class))
+                .isEqualTo(STRUCTURED_TYPE);
 
         // maximum plan content
         config.set(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS, ALL);
@@ -151,7 +152,7 @@ public class LogicalTypeJsonSerdeTest {
                 TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
                 TableConfigOptions.CatalogPlanRestore.IDENTIFIER);
         dataTypeFactoryMock.logicalType = Optional.empty();
-        assertThatThrownBy(() -> toLogicalType(serdeContext, maximumJson))
+        assertThatThrownBy(() -> toObject(serdeContext, maximumJson, LogicalType.class))
                 .satisfies(anyCauseMatches(ValidationException.class, "No type found."));
 
         // catalog lookup
@@ -159,14 +160,16 @@ public class LogicalTypeJsonSerdeTest {
                 TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
                 TableConfigOptions.CatalogPlanRestore.IDENTIFIER);
         dataTypeFactoryMock.logicalType = Optional.of(UPDATED_STRUCTURED_TYPE);
-        assertThat(toLogicalType(serdeContext, maximumJson)).isEqualTo(UPDATED_STRUCTURED_TYPE);
+        assertThat(toObject(serdeContext, maximumJson, LogicalType.class))
+                .isEqualTo(UPDATED_STRUCTURED_TYPE);
 
         // no lookup
         config.set(
                 TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS,
                 TableConfigOptions.CatalogPlanRestore.ALL);
         dataTypeFactoryMock.logicalType = Optional.of(UPDATED_STRUCTURED_TYPE);
-        assertThat(toLogicalType(serdeContext, maximumJson)).isEqualTo(STRUCTURED_TYPE);
+        assertThat(toObject(serdeContext, maximumJson, LogicalType.class))
+                .isEqualTo(STRUCTURED_TYPE);
     }
 
     // --------------------------------------------------------------------------------------------
@@ -418,28 +421,4 @@ public class LogicalTypeJsonSerdeTest {
             DataType dataType, boolean isInternalType) {
         return isInternalType ? dataType.toInternal() : dataType;
     }
-
-    // --------------------------------------------------------------------------------------------
-    // Shared utilities
-    // --------------------------------------------------------------------------------------------
-
-    static String toJson(SerdeContext serdeContext, LogicalType logicalType) {
-        final ObjectWriter objectWriter = JsonSerdeUtil.createObjectWriter(serdeContext);
-        final String json;
-        try {
-            json = objectWriter.writeValueAsString(logicalType);
-        } catch (JsonProcessingException e) {
-            throw new AssertionError(e);
-        }
-        return json;
-    }
-
-    static LogicalType toLogicalType(SerdeContext serdeContext, String json) {
-        final ObjectReader objectReader = JsonSerdeUtil.createObjectReader(serdeContext);
-        try {
-            return objectReader.readValue(json, LogicalType.class);
-        } catch (IOException e) {
-            throw new AssertionError(e);
-        }
-    }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerdeTest.java
index 69aa587..5426fe2 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RelDataTypeJsonSerdeTest.java
@@ -18,26 +18,15 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.base.VoidSerializer;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.module.ModuleManager;
-import org.apache.flink.table.planner.calcite.FlinkContextImpl;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
 import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
 import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LegacyTypeInformationType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RawType;
 import org.apache.flink.table.types.logical.StructuredType;
-import org.apache.flink.table.types.logical.TypeInformationRawType;
 import org.apache.flink.table.types.logical.VarCharType;
-import org.apache.flink.table.utils.CatalogManagerMocks;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
 
 import org.apache.calcite.avatica.util.TimeUnit;
 import org.apache.calcite.rel.type.RelDataType;
@@ -46,83 +35,123 @@ import org.apache.calcite.sql.SqlIntervalQualifier;
 import org.apache.calcite.sql.parser.SqlParserPos;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.runners.Parameterized.Parameters;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.configuredSerdeContext;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toJson;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeMocks.toObject;
+import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for serialization/deserialization of {@link RelDataType}. */
-@RunWith(Parameterized.class)
+/** Tests for {@link RelDataType} serialization and deserialization. */
 public class RelDataTypeJsonSerdeTest {
+
     private static final FlinkTypeFactory FACTORY = FlinkTypeFactory.INSTANCE();
 
-    @Parameterized.Parameters(name = "type = {0}")
-    public static Collection<RelDataType> parameters() {
+    @ParameterizedTest
+    @MethodSource("testRelDataTypeSerde")
+    public void testRelDataTypeSerde(RelDataType relDataType) throws IOException {
+        final SerdeContext serdeContext = configuredSerdeContext();
+
+        final String json = toJson(serdeContext, relDataType);
+        final RelDataType actual = toObject(serdeContext, json, RelDataType.class);
+
+        assertThat(actual).isSameAs(relDataType);
+    }
+
+    @Test
+    public void testMissingPrecisionAndScale() {
+        final SerdeContext serdeContext = configuredSerdeContext();
+
+        final String json =
+                toJson(
+                        serdeContext,
+                        FACTORY.createSqlIntervalType(
+                                new SqlIntervalQualifier(
+                                        TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)));
+        final RelDataType actual = toObject(serdeContext, json, RelDataType.class);
+
+        assertThat(actual)
+                .isSameAs(
+                        FACTORY.createSqlIntervalType(
+                                new SqlIntervalQualifier(
+                                        TimeUnit.DAY,
+                                        DayTimeIntervalType.DEFAULT_DAY_PRECISION,
+                                        TimeUnit.SECOND,
+                                        DayTimeIntervalType.DEFAULT_FRACTIONAL_PRECISION,
+                                        SqlParserPos.ZERO)));
+    }
+
+    @Test
+    public void testNegativeScale() {
+        final SerdeContext serdeContext = configuredSerdeContext();
+
+        final String json = toJson(serdeContext, FACTORY.createSqlType(SqlTypeName.DECIMAL, 5, -1));
+        final RelDataType actual = toObject(serdeContext, json, RelDataType.class);
+
+        assertThat(actual).isSameAs(FACTORY.createSqlType(SqlTypeName.DECIMAL, 6, 0));
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Test data
+    // --------------------------------------------------------------------------------------------
+
+    @Parameters(name = "{0}")
+    public static List<RelDataType> testRelDataTypeSerde() {
         // the values in the list do not care about nullable.
-        List<RelDataType> types =
+        final List<RelDataType> types =
                 Arrays.asList(
                         FACTORY.createSqlType(SqlTypeName.BOOLEAN),
                         FACTORY.createSqlType(SqlTypeName.TINYINT),
                         FACTORY.createSqlType(SqlTypeName.SMALLINT),
                         FACTORY.createSqlType(SqlTypeName.INTEGER),
                         FACTORY.createSqlType(SqlTypeName.BIGINT),
-                        FACTORY.createSqlType(SqlTypeName.DECIMAL, 3, 10),
-                        FACTORY.createSqlType(SqlTypeName.DECIMAL, 0, 19),
-                        FACTORY.createSqlType(SqlTypeName.DECIMAL, -1, 19),
+                        FACTORY.createSqlType(SqlTypeName.DECIMAL, 10, 3),
+                        FACTORY.createSqlType(SqlTypeName.DECIMAL, 19, 0),
+                        FACTORY.createSqlType(SqlTypeName.DECIMAL, 38, 19),
                         FACTORY.createSqlType(SqlTypeName.FLOAT),
-                        FACTORY.createSqlType(SqlTypeName.REAL),
                         FACTORY.createSqlType(SqlTypeName.DOUBLE),
                         FACTORY.createSqlType(SqlTypeName.DATE),
                         FACTORY.createSqlType(SqlTypeName.TIME),
-                        FACTORY.createSqlType(SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE),
                         FACTORY.createSqlType(SqlTypeName.TIMESTAMP),
                         FACTORY.createSqlType(SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE),
                         FACTORY.createSqlIntervalType(
                                 new SqlIntervalQualifier(
-                                        TimeUnit.YEAR, TimeUnit.YEAR, SqlParserPos.ZERO)),
-                        FACTORY.createSqlIntervalType(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO)),
-                        FACTORY.createSqlIntervalType(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.MONTH, TimeUnit.MONTH, SqlParserPos.ZERO)),
-                        FACTORY.createSqlIntervalType(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.DAY, SqlParserPos.ZERO)),
+                                        TimeUnit.DAY,
+                                        2,
+                                        TimeUnit.MINUTE,
+                                        RelDataType.PRECISION_NOT_SPECIFIED,
+                                        SqlParserPos.ZERO)),
                         FACTORY.createSqlIntervalType(
                                 new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.HOUR, SqlParserPos.ZERO)),
+                                        TimeUnit.DAY, 6, TimeUnit.SECOND, 9, SqlParserPos.ZERO)),
                         FACTORY.createSqlIntervalType(
                                 new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.MINUTE, SqlParserPos.ZERO)),
+                                        TimeUnit.HOUR,
+                                        RelDataType.PRECISION_NOT_SPECIFIED,
+                                        TimeUnit.SECOND,
+                                        9,
+                                        SqlParserPos.ZERO)),
                         FACTORY.createSqlIntervalType(
                                 new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)),
+                                        TimeUnit.MINUTE,
+                                        RelDataType.PRECISION_NOT_SPECIFIED,
+                                        TimeUnit.SECOND,
+                                        0,
+                                        SqlParserPos.ZERO)),
                         FACTORY.createSqlIntervalType(
                                 new SqlIntervalQualifier(
-                                        TimeUnit.HOUR, TimeUnit.HOUR, SqlParserPos.ZERO)),
-                        FACTORY.createSqlIntervalType(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.HOUR, TimeUnit.MINUTE, SqlParserPos.ZERO)),
-                        FACTORY.createSqlIntervalType(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.HOUR, TimeUnit.SECOND, SqlParserPos.ZERO)),
-                        FACTORY.createSqlIntervalType(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.MINUTE, TimeUnit.MINUTE, SqlParserPos.ZERO)),
-                        FACTORY.createSqlIntervalType(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.MINUTE, TimeUnit.SECOND, SqlParserPos.ZERO)),
-                        FACTORY.createSqlIntervalType(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.SECOND, TimeUnit.SECOND, SqlParserPos.ZERO)),
+                                        TimeUnit.SECOND,
+                                        RelDataType.PRECISION_NOT_SPECIFIED,
+                                        TimeUnit.SECOND,
+                                        6,
+                                        SqlParserPos.ZERO)),
                         FACTORY.createSqlType(SqlTypeName.CHAR),
                         FACTORY.createSqlType(SqlTypeName.CHAR, 0),
                         FACTORY.createSqlType(SqlTypeName.CHAR, 32),
@@ -136,7 +165,6 @@ public class RelDataTypeJsonSerdeTest {
                         FACTORY.createSqlType(SqlTypeName.VARBINARY, 0),
                         FACTORY.createSqlType(SqlTypeName.VARBINARY, 1000),
                         FACTORY.createSqlType(SqlTypeName.NULL),
-                        FACTORY.createSqlType(SqlTypeName.ANY),
                         FACTORY.createSqlType(SqlTypeName.SYMBOL),
                         FACTORY.createMultisetType(FACTORY.createSqlType(SqlTypeName.VARCHAR), -1),
                         FACTORY.createArrayType(FACTORY.createSqlType(SqlTypeName.VARCHAR, 16), -1),
@@ -156,17 +184,16 @@ public class RelDataTypeJsonSerdeTest {
                                                 FACTORY.createSqlType(SqlTypeName.INTEGER),
                                                 FACTORY.createSqlType(SqlTypeName.VARCHAR, 10)),
                                         -1)),
-                        FACTORY.createSqlType(SqlTypeName.DISTINCT),
-                        FACTORY.createSqlType(SqlTypeName.STRUCTURED),
                         // simple struct type
                         FACTORY.createStructType(
-                                StructKind.PEEK_FIELDS,
+                                StructKind.PEEK_FIELDS_NO_EXPAND,
                                 Arrays.asList(
                                         FACTORY.createSqlType(SqlTypeName.INTEGER),
-                                        FACTORY.createSqlType(SqlTypeName.DECIMAL, 3, 10)),
+                                        FACTORY.createSqlType(SqlTypeName.DECIMAL, 10, 3)),
                                 Arrays.asList("f1", "f2")),
                         // struct type with array type
                         FACTORY.createStructType(
+                                StructKind.PEEK_FIELDS_NO_EXPAND,
                                 Arrays.asList(
                                         FACTORY.createSqlType(SqlTypeName.VARCHAR),
                                         FACTORY.createArrayType(
@@ -175,8 +202,10 @@ public class RelDataTypeJsonSerdeTest {
                                 Arrays.asList("f1", "f2")),
                         // nested struct type
                         FACTORY.createStructType(
+                                StructKind.PEEK_FIELDS_NO_EXPAND,
                                 Arrays.asList(
                                         FACTORY.createStructType(
+                                                StructKind.PEEK_FIELDS_NO_EXPAND,
                                                 Arrays.asList(
                                                         FACTORY.createSqlType(
                                                                 SqlTypeName.VARCHAR, 5),
@@ -187,13 +216,10 @@ public class RelDataTypeJsonSerdeTest {
                                                 FACTORY.createSqlType(SqlTypeName.VARCHAR, 16),
                                                 -1)),
                                 Arrays.asList("f3", "f4")),
-                        FACTORY.createSqlType(SqlTypeName.SARG),
                         FACTORY.createRowtimeIndicatorType(true, false),
                         FACTORY.createRowtimeIndicatorType(true, true),
                         FACTORY.createProctimeIndicatorType(true),
                         FACTORY.createFieldTypeFromLogicalType(
-                                new LegacyTypeInformationType<>(LogicalTypeRoot.RAW, Types.STRING)),
-                        FACTORY.createFieldTypeFromLogicalType(
                                 StructuredType.newBuilder(
                                                 ObjectIdentifier.of("cat", "db", "structuredType"),
                                                 DataTypeJsonSerdeTest.PojoClass.class)
@@ -213,79 +239,28 @@ public class RelDataTypeJsonSerdeTest {
                                         .description("description for StructuredType")
                                         .build()));
 
-        List<RelDataType> ret = new ArrayList<>(types.size() * 2);
+        final List<RelDataType> mutableTypes = new ArrayList<>(types.size() * 2);
         for (RelDataType type : types) {
-            ret.add(FACTORY.createTypeWithNullability(type, true));
-            ret.add(FACTORY.createTypeWithNullability(type, false));
+            mutableTypes.add(FACTORY.createTypeWithNullability(type, true));
+            mutableTypes.add(FACTORY.createTypeWithNullability(type, false));
         }
 
-        ret.add(
+        mutableTypes.add(
                 FACTORY.createTypeWithNullability(
                         FACTORY.createFieldTypeFromLogicalType(
                                 new RawType<>(true, Void.class, VoidSerializer.INSTANCE)),
                         true));
-        ret.add(
+        mutableTypes.add(
                 FACTORY.createTypeWithNullability(
                         FACTORY.createFieldTypeFromLogicalType(
                                 new RawType<>(false, Void.class, VoidSerializer.INSTANCE)),
                         false));
-        ret.add(
+        mutableTypes.add(
                 FACTORY.createTypeWithNullability(
                         FACTORY.createFieldTypeFromLogicalType(
                                 new RawType<>(true, Void.class, VoidSerializer.INSTANCE)),
                         false));
-        ret.add(
-                FACTORY.createTypeWithNullability(
-                        FACTORY.createFieldTypeFromLogicalType(
-                                new TypeInformationRawType<>(true, Types.STRING)),
-                        true));
-        ret.add(
-                FACTORY.createTypeWithNullability(
-                        FACTORY.createFieldTypeFromLogicalType(
-                                new TypeInformationRawType<>(false, Types.STRING)),
-                        false));
-        ret.add(
-                FACTORY.createTypeWithNullability(
-                        FACTORY.createFieldTypeFromLogicalType(
-                                new TypeInformationRawType<>(true, Types.STRING)),
-                        false));
 
-        return ret;
-    }
-
-    @Parameterized.Parameter public RelDataType relDataType;
-
-    @Test
-    public void testTypeSerde() throws IOException {
-        SerdeContext serdeCtx =
-                new SerdeContext(
-                        new FlinkContextImpl(
-                                false,
-                                TableConfig.getDefault(),
-                                new ModuleManager(),
-                                null,
-                                CatalogManagerMocks.createEmptyCatalogManager(),
-                                null),
-                        Thread.currentThread().getContextClassLoader(),
-                        FlinkTypeFactory.INSTANCE(),
-                        FlinkSqlOperatorTable.instance());
-        ObjectReader objectReader = JsonSerdeUtil.createObjectReader(serdeCtx);
-        ObjectWriter objectWriter = JsonSerdeUtil.createObjectWriter(serdeCtx);
-
-        final String json = objectWriter.writeValueAsString(relDataType);
-
-        RelDataType actual = objectReader.readValue(json, RelDataType.class);
-        // type system will fill the default precision if the precision is not defined
-        if (relDataType.toString().equals("DECIMAL")) {
-            assertEquals(SqlTypeName.DECIMAL, actual.getSqlTypeName());
-            assertEquals(relDataType.getScale(), actual.getScale());
-            assertEquals(
-                    serdeCtx.getTypeFactory()
-                            .getTypeSystem()
-                            .getDefaultPrecision(SqlTypeName.DECIMAL),
-                    actual.getPrecision());
-        } else {
-            assertSame(relDataType, actual);
-        }
+        return mutableTypes;
     }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java
index becc4030..07a6ad2 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeSerdeTest.java
@@ -35,7 +35,6 @@ import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.utils.EncodingUtils;
 
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
 
@@ -60,7 +59,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.io.StringWriter;
 import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.List;
@@ -96,12 +94,12 @@ public class RexNodeSerdeTest {
         RexBuilder rexBuilder = new RexBuilder(FACTORY);
         RelDataType inputType =
                 FACTORY.createStructType(
-                        StructKind.PEEK_FIELDS,
+                        StructKind.PEEK_FIELDS_NO_EXPAND,
                         Arrays.asList(
                                 FACTORY.createSqlType(SqlTypeName.INTEGER),
                                 FACTORY.createSqlType(SqlTypeName.BIGINT),
                                 FACTORY.createStructType(
-                                        StructKind.PEEK_FIELDS,
+                                        StructKind.PEEK_FIELDS_NO_EXPAND,
                                         Arrays.asList(
                                                 FACTORY.createSqlType(SqlTypeName.VARCHAR),
                                                 FACTORY.createSqlType(SqlTypeName.VARCHAR)),
@@ -133,100 +131,34 @@ public class RexNodeSerdeTest {
                                 FACTORY.createSqlType(SqlTypeName.FLOAT)),
                         rexBuilder.makeExactLiteral(BigDecimal.valueOf(random.nextDouble())),
                         rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.YEAR, TimeUnit.YEAR, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
                                 BigDecimal.valueOf(100),
                                 new SqlIntervalQualifier(
-                                        TimeUnit.YEAR, TimeUnit.YEAR, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                BigDecimal.valueOf(3),
-                                new SqlIntervalQualifier(
-                                        TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.MONTH, TimeUnit.MONTH, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                BigDecimal.valueOf(3),
-                                new SqlIntervalQualifier(
-                                        TimeUnit.MONTH, TimeUnit.MONTH, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.DAY, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                BigDecimal.valueOf(3),
-                                new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.DAY, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.HOUR, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                BigDecimal.valueOf(3),
-                                new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.HOUR, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.MINUTE, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                BigDecimal.valueOf(3),
-                                new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.MINUTE, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                BigDecimal.valueOf(3),
-                                new SqlIntervalQualifier(
-                                        TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.HOUR, TimeUnit.HOUR, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                BigDecimal.valueOf(3),
-                                new SqlIntervalQualifier(
-                                        TimeUnit.HOUR, TimeUnit.HOUR, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.HOUR, TimeUnit.MINUTE, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                BigDecimal.valueOf(3),
-                                new SqlIntervalQualifier(
-                                        TimeUnit.HOUR, TimeUnit.MINUTE, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.HOUR, TimeUnit.SECOND, SqlParserPos.ZERO)),
+                                        TimeUnit.YEAR,
+                                        4,
+                                        TimeUnit.YEAR,
+                                        RelDataType.PRECISION_NOT_SPECIFIED,
+                                        SqlParserPos.ZERO)),
                         rexBuilder.makeIntervalLiteral(
                                 BigDecimal.valueOf(3),
                                 new SqlIntervalQualifier(
-                                        TimeUnit.HOUR, TimeUnit.SECOND, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.MINUTE, TimeUnit.MINUTE, SqlParserPos.ZERO)),
+                                        TimeUnit.YEAR,
+                                        2,
+                                        TimeUnit.MONTH,
+                                        RelDataType.PRECISION_NOT_SPECIFIED,
+                                        SqlParserPos.ZERO)),
                         rexBuilder.makeIntervalLiteral(
                                 BigDecimal.valueOf(3),
                                 new SqlIntervalQualifier(
-                                        TimeUnit.MINUTE, TimeUnit.MINUTE, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.MINUTE, TimeUnit.SECOND, SqlParserPos.ZERO)),
+                                        TimeUnit.DAY, 2, TimeUnit.SECOND, 6, SqlParserPos.ZERO)),
                         rexBuilder.makeIntervalLiteral(
                                 BigDecimal.valueOf(3),
                                 new SqlIntervalQualifier(
-                                        TimeUnit.MINUTE, TimeUnit.SECOND, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                new SqlIntervalQualifier(
-                                        TimeUnit.SECOND, TimeUnit.SECOND, SqlParserPos.ZERO)),
-                        rexBuilder.makeIntervalLiteral(
-                                BigDecimal.valueOf(3),
-                                new SqlIntervalQualifier(
-                                        TimeUnit.SECOND, TimeUnit.SECOND, SqlParserPos.ZERO)),
+                                        TimeUnit.SECOND, 2, TimeUnit.SECOND, 6, SqlParserPos.ZERO)),
                         rexBuilder.makeDateLiteral(DateString.fromDaysSinceEpoch(10)),
                         rexBuilder.makeDateLiteral(new DateString("2000-12-12")),
                         rexBuilder.makeTimeLiteral(TimeString.fromMillisOfDay(1234), 3),
                         rexBuilder.makeTimeLiteral(TimeString.fromMillisOfDay(123456), 6),
                         rexBuilder.makeTimeLiteral(new TimeString("01:01:01.000000001"), 9),
-                        rexBuilder.makeTimeWithLocalTimeZoneLiteral(
-                                TimeString.fromMillisOfDay(1234), 3),
                         rexBuilder.makeTimestampLiteral(
                                 TimestampString.fromMillisSinceEpoch(1234), 3),
                         rexBuilder.makeTimestampLiteral(
@@ -245,6 +177,7 @@ public class RexNodeSerdeTest {
                         rexBuilder.makeLiteral(
                                 Arrays.<Object>asList(1, 2L),
                                 FACTORY.createStructType(
+                                        StructKind.PEEK_FIELDS_NO_EXPAND,
                                         Arrays.asList(
                                                 FACTORY.createSqlType(SqlTypeName.INTEGER),
                                                 FACTORY.createSqlType(SqlTypeName.BIGINT)),
@@ -373,11 +306,7 @@ public class RexNodeSerdeTest {
         ObjectReader objectReader = JsonSerdeUtil.createObjectReader(serdeCtx);
         ObjectWriter objectWriter = JsonSerdeUtil.createObjectWriter(serdeCtx);
 
-        StringWriter writer = new StringWriter(100);
-        try (JsonGenerator gen = objectWriter.getFactory().createGenerator(writer)) {
-            gen.writeObject(rexNode);
-        }
-        String json = writer.toString();
+        String json = objectWriter.writeValueAsString(rexNode);
         RexNode actual = objectReader.readValue(json, RexNode.class);
         assertEquals(rexNode, actual);
     }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java
index 2a04c58..1f69163 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexWindowBoundSerdeTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.planner.calcite.FlinkContextImpl;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.utils.CatalogManagerMocks;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
@@ -48,7 +49,7 @@ public class RexWindowBoundSerdeTest {
                                 TableConfig.getDefault(),
                                 new ModuleManager(),
                                 null,
-                                null,
+                                CatalogManagerMocks.createEmptyCatalogManager(),
                                 null),
                         Thread.currentThread().getContextClassLoader(),
                         FlinkTypeFactory.INSTANCE(),
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java
index 9ad7670..ff56411 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java
@@ -38,7 +38,6 @@ import org.apache.flink.table.planner.plan.schema.TableSourceTable;
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
 import org.apache.flink.table.utils.CatalogManagerMocks;
 
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter;
 
@@ -47,7 +46,6 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.io.StringWriter;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -81,13 +79,9 @@ public class TemporalTableSourceSpecSerdeTest {
         ObjectReader objectReader = JsonSerdeUtil.createObjectReader(serdeCtx);
         ObjectWriter objectWriter = JsonSerdeUtil.createObjectWriter(serdeCtx);
 
-        StringWriter writer = new StringWriter(100);
         List<TemporalTableSourceSpec> specs = testData();
         for (TemporalTableSourceSpec spec : specs) {
-            try (JsonGenerator gen = objectWriter.getFactory().createGenerator(writer)) {
-                gen.writeObject(spec);
-            }
-            String json = writer.toString();
+            String json = objectWriter.writeValueAsString(spec);
             TemporalTableSourceSpec actual =
                     objectReader.readValue(json, TemporalTableSourceSpec.class);
             assertEquals(spec.getTableSourceSpec(), actual.getTableSourceSpec());
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverterTest.java
new file mode 100644
index 0000000..7ef17fe
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/typeutils/LogicalRelDataTypeConverterTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.typeutils;
+
+import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerdeTest;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.utils.DataTypeFactoryMock;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.test.TableAssertions.assertThat;
+
+/** Tests for {@link LogicalRelDataTypeConverter}. */
+public class LogicalRelDataTypeConverterTest {
+
+    @ParameterizedTest
+    @MethodSource("testConversion")
+    public void testConversion(LogicalType logicalType) throws IOException {
+        final RelDataTypeFactory typeFactory = FlinkTypeFactory.INSTANCE();
+        final DataTypeFactoryMock dataTypeFactory = new DataTypeFactoryMock();
+        final RelDataType relDataType =
+                LogicalRelDataTypeConverter.toRelDataType(logicalType, typeFactory);
+        assertThat(LogicalRelDataTypeConverter.toLogicalType(relDataType, dataTypeFactory))
+                .isEqualTo(logicalType);
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Test data
+    // --------------------------------------------------------------------------------------------
+
+    @Parameters(name = "{0}")
+    private static Stream<LogicalType> testConversion() {
+        return Stream.of(
+                new BooleanType(),
+                new TinyIntType(),
+                new SmallIntType(),
+                new IntType(),
+                new BigIntType(),
+                new FloatType(),
+                new DoubleType(),
+                new DecimalType(10),
+                new DecimalType(15, 5),
+                CharType.ofEmptyLiteral(),
+                new CharType(),
+                new CharType(5),
+                VarCharType.ofEmptyLiteral(),
+                new VarCharType(),
+                new VarCharType(5),
+                BinaryType.ofEmptyLiteral(),
+                new BinaryType(),
+                new BinaryType(100),
+                VarBinaryType.ofEmptyLiteral(),
+                new VarBinaryType(),
+                new VarBinaryType(100),
+                new DateType(),
+                new TimeType(),
+                new TimeType(3),
+                new TimestampType(),
+                new TimestampType(3),
+                new LocalZonedTimestampType(false, TimestampKind.PROCTIME, 3),
+                new TimestampType(false, TimestampKind.ROWTIME, 3),
+                new LocalZonedTimestampType(),
+                new LocalZonedTimestampType(3),
+                new LocalZonedTimestampType(false, TimestampKind.PROCTIME, 3),
+                new LocalZonedTimestampType(false, TimestampKind.ROWTIME, 3),
+                new DayTimeIntervalType(DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR),
+                new DayTimeIntervalType(
+                        false, DayTimeIntervalType.DayTimeResolution.DAY_TO_HOUR, 3, 6),
+                new YearMonthIntervalType(YearMonthIntervalType.YearMonthResolution.YEAR_TO_MONTH),
+                new YearMonthIntervalType(
+                        false, YearMonthIntervalType.YearMonthResolution.MONTH, 2),
+                new LocalZonedTimestampType(),
+                new LocalZonedTimestampType(false, TimestampKind.PROCTIME, 3),
+                new SymbolType<>(),
+                new ArrayType(new IntType(false)),
+                new ArrayType(new LocalZonedTimestampType(false, TimestampKind.ROWTIME, 3)),
+                new ArrayType(new TimestampType()),
+                new ArrayType(CharType.ofEmptyLiteral()),
+                new ArrayType(VarCharType.ofEmptyLiteral()),
+                new ArrayType(BinaryType.ofEmptyLiteral()),
+                new ArrayType(VarBinaryType.ofEmptyLiteral()),
+                new MapType(new BigIntType(), new IntType(false)),
+                new MapType(
+                        new TimestampType(false, TimestampKind.ROWTIME, 3),
+                        new LocalZonedTimestampType()),
+                new MapType(CharType.ofEmptyLiteral(), CharType.ofEmptyLiteral()),
+                new MapType(VarCharType.ofEmptyLiteral(), VarCharType.ofEmptyLiteral()),
+                new MapType(BinaryType.ofEmptyLiteral(), BinaryType.ofEmptyLiteral()),
+                new MapType(VarBinaryType.ofEmptyLiteral(), VarBinaryType.ofEmptyLiteral()),
+                new MultisetType(new IntType(false)),
+                new MultisetType(new TimestampType()),
+                new MultisetType(new TimestampType(true, TimestampKind.ROWTIME, 3)),
+                new MultisetType(CharType.ofEmptyLiteral()),
+                new MultisetType(VarCharType.ofEmptyLiteral()),
+                new MultisetType(BinaryType.ofEmptyLiteral()),
+                new MultisetType(VarBinaryType.ofEmptyLiteral()),
+                RowType.of(new BigIntType(), new IntType(false), new VarCharType(200)),
+                RowType.of(
+                        new LogicalType[] {
+                            new BigIntType(), new IntType(false), new VarCharType(200)
+                        },
+                        new String[] {"f1", "f2", "f3"}),
+                RowType.of(
+                        new TimestampType(false, TimestampKind.ROWTIME, 3),
+                        new TimestampType(false, TimestampKind.REGULAR, 3),
+                        new LocalZonedTimestampType(false, TimestampKind.ROWTIME, 3),
+                        new LocalZonedTimestampType(false, TimestampKind.PROCTIME, 3),
+                        new LocalZonedTimestampType(false, TimestampKind.REGULAR, 3)),
+                RowType.of(
+                        CharType.ofEmptyLiteral(),
+                        VarCharType.ofEmptyLiteral(),
+                        BinaryType.ofEmptyLiteral(),
+                        VarBinaryType.ofEmptyLiteral()),
+                // registered structured type
+                StructuredType.newBuilder(
+                                ObjectIdentifier.of("cat", "db", "structuredType"),
+                                DataTypeJsonSerdeTest.PojoClass.class)
+                        .attributes(
+                                Arrays.asList(
+                                        new StructuredType.StructuredAttribute(
+                                                "f0", new IntType(true)),
+                                        new StructuredType.StructuredAttribute(
+                                                "f1", new BigIntType(true)),
+                                        new StructuredType.StructuredAttribute(
+                                                "f2", new VarCharType(200), "desc")))
+                        .comparison(StructuredType.StructuredComparison.FULL)
+                        .setFinal(false)
+                        .setInstantiable(false)
+                        .superType(
+                                StructuredType.newBuilder(
+                                                ObjectIdentifier.of("cat", "db", "structuredType2"))
+                                        .attributes(
+                                                Collections.singletonList(
+                                                        new StructuredType.StructuredAttribute(
+                                                                "f0", new BigIntType(false))))
+                                        .build())
+                        .description("description for StructuredType")
+                        .build(),
+                // unregistered structured type
+                StructuredType.newBuilder(PojoClass.class)
+                        .attributes(
+                                Arrays.asList(
+                                        new StructuredType.StructuredAttribute(
+                                                "f0", new IntType(true)),
+                                        new StructuredType.StructuredAttribute(
+                                                "f1", new BigIntType(true)),
+                                        new StructuredType.StructuredAttribute(
+                                                "f2", new VarCharType(200), "desc")))
+                        .build(),
+                // custom RawType
+                new RawType<>(LocalDateTime.class, LocalDateTimeSerializer.INSTANCE));
+    }
+
+    /** Testing class. */
+    public static class PojoClass {
+        public int f0;
+        public long f1;
+        public String f2;
+    }
+}