You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/06 11:20:26 UTC

[GitHub] [flink] slinkydeveloper commented on a change in pull request #18274: [FLINK-25230][table-planner] Harden type serialization for LogicalType and DataType

slinkydeveloper commented on a change in pull request #18274:
URL: https://github.com/apache/flink/pull/18274#discussion_r779451037



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/DataTypes.java
##########
@@ -104,6 +106,9 @@
      * @see LogicalType#getDefaultConversion()
      */
     public static DataType of(LogicalType logicalType) {
+        Preconditions.checkArgument(
+                !LogicalTypeChecks.hasNested(logicalType, t -> t.is(LogicalTypeRoot.UNRESOLVED)),
+                "Unresolved logical types cannot be used to create a data type at this location.");
         return TypeConversions.fromLogicalToDataType(logicalType);

Review comment:
       I think we should do this check inside `fromLogicalToDataType`?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LogicalTypeDataTypeConverter.java
##########
@@ -201,9 +200,29 @@ public DataType visit(RowType rowType) {
 
         @Override
         public DataType visit(DistinctType distinctType) {
-            return new FieldsDataType(
-                    distinctType,
-                    Collections.singletonList(distinctType.getSourceType().accept(this)));
+            final DataType sourceDataType = distinctType.getSourceType().accept(this);
+            if (sourceDataType instanceof AtomicDataType) {
+                return new AtomicDataType(distinctType, sourceDataType.getConversionClass());
+            } else if (sourceDataType instanceof CollectionDataType) {
+                final CollectionDataType collectionDataType = (CollectionDataType) sourceDataType;
+                return new CollectionDataType(
+                        distinctType,
+                        collectionDataType.getConversionClass(),
+                        collectionDataType.getElementDataType());
+            } else if (sourceDataType instanceof KeyValueDataType) {
+                final KeyValueDataType keyValueDataType = (KeyValueDataType) sourceDataType;
+                return new KeyValueDataType(
+                        distinctType,
+                        keyValueDataType.getConversionClass(),
+                        keyValueDataType.getKeyDataType(),
+                        keyValueDataType.getValueDataType());
+            } else if (sourceDataType instanceof FieldsDataType) {
+                return new FieldsDataType(
+                        distinctType,
+                        sourceDataType.getConversionClass(),
+                        sourceDataType.getChildren());
+            }
+            throw new IllegalStateException("Unexpected data type instance.");

Review comment:
       Why `distinctType.getSourceType().accept(this)` isn't enough? Why preserving the `distinctType`? If we need to preserve distinct type, then shouldn't exist some DataType implementation corresponding to distinct type?

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonDeserializer.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_CONVERSION_CLASS;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_ELEMENT_CLASS;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_FIELDS;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_FIELD_NAME;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_KEY_CLASS;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_TYPE;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_VALUE_CLASS;
+
+/**
+ * JSON deserializer for {@link DataType}.
+ *
+ * @see DataTypeJsonSerializer for the reverse operation
+ */
+@Internal
+public class DataTypeJsonDeserializer extends StdDeserializer<DataType> {
+
+    public DataTypeJsonDeserializer() {
+        super(DataType.class);
+    }
+
+    @Override
+    public DataType deserialize(JsonParser jsonParser, DeserializationContext ctx)
+            throws IOException {
+        final JsonNode dataTypeNode = jsonParser.readValueAsTree();
+        final SerdeContext serdeContext = SerdeContext.from(ctx);
+        return deserialize(dataTypeNode, serdeContext);
+    }
+
+    public static DataType deserialize(JsonNode dataTypeNode, SerdeContext serdeContext) {
+        if (dataTypeNode.isTextual()) {
+            return deserializeWithInternalClass(dataTypeNode, serdeContext);
+        } else {
+            return deserializeWithExternalClass(dataTypeNode, serdeContext);
+        }
+    }
+
+    private static DataType deserializeWithInternalClass(
+            JsonNode logicalTypeNode, SerdeContext serdeContext) {
+        final LogicalType logicalType =
+                LogicalTypeJsonDeserializer.deserialize(logicalTypeNode, serdeContext);
+        return DataTypes.of(logicalType).toInternal();
+    }
+
+    private static DataType deserializeWithExternalClass(
+            JsonNode dataTypeNode, SerdeContext serdeContext) {
+        final LogicalType logicalType =
+                LogicalTypeJsonDeserializer.deserialize(
+                        dataTypeNode.get(FIELD_NAME_TYPE), serdeContext);
+        return deserializeClass(logicalType, dataTypeNode, serdeContext);
+    }
+
+    private static DataType deserializeClass(
+            LogicalType logicalType, @Nullable JsonNode classNode, SerdeContext serdeContext) {
+        if (classNode == null) {
+            return DataTypes.of(logicalType).toInternal();
+        }
+
+        final DataType dataType;
+        switch (logicalType.getTypeRoot()) {
+            case ARRAY:
+            case MULTISET:
+                final DataType elementDataType =
+                        deserializeClass(
+                                logicalType.getChildren().get(0),
+                                classNode.get(FIELD_NAME_ELEMENT_CLASS),
+                                serdeContext);
+                dataType = new CollectionDataType(logicalType, elementDataType);
+                break;
+
+            case MAP:
+                final MapType mapType = (MapType) logicalType;
+                final DataType keyDataType =
+                        deserializeClass(
+                                mapType.getKeyType(),
+                                classNode.get(FIELD_NAME_KEY_CLASS),
+                                serdeContext);
+                final DataType valueDataType =
+                        deserializeClass(
+                                mapType.getValueType(),
+                                classNode.get(FIELD_NAME_VALUE_CLASS),
+                                serdeContext);
+                dataType = new KeyValueDataType(mapType, keyDataType, valueDataType);
+                break;
+
+            case ROW:
+            case STRUCTURED_TYPE:
+                final List<String> fieldNames = LogicalTypeChecks.getFieldNames(logicalType);
+                final List<LogicalType> fieldTypes = LogicalTypeChecks.getFieldTypes(logicalType);
+
+                final ArrayNode fieldNodes = (ArrayNode) classNode.get(FIELD_NAME_FIELDS);
+                final Map<String, JsonNode> fieldNodesByName = new HashMap<>();
+                if (fieldNodes != null) {
+                    fieldNodes.forEach(
+                            fieldNode ->
+                                    fieldNodesByName.put(
+                                            fieldNode.get(FIELD_NAME_FIELD_NAME).asText(),
+                                            fieldNode));
+                }
+
+                final List<DataType> fieldDataTypes =
+                        IntStream.range(0, fieldNames.size())
+                                .mapToObj(
+                                        i -> {
+                                            final String fieldName = fieldNames.get(i);
+                                            final LogicalType fieldType = fieldTypes.get(i);
+                                            return deserializeClass(
+                                                    fieldType,
+                                                    fieldNodesByName.get(fieldName),
+                                                    serdeContext);
+                                        })
+                                .collect(Collectors.toList());
+
+                dataType = new FieldsDataType(logicalType, fieldDataTypes);
+                break;
+
+            case DISTINCT_TYPE:
+                final DistinctType distinctType = (DistinctType) logicalType;
+                dataType = deserializeClass(distinctType.getSourceType(), classNode, serdeContext);
+                break;
+
+            default:
+                dataType = DataTypes.of(logicalType).toInternal();
+        }
+
+        final Class<?> conversionClass =
+                loadClass(
+                        classNode.get(FIELD_NAME_CONVERSION_CLASS).asText(),
+                        serdeContext,
+                        String.format("conversion class of data type '%s'", dataType));
+        return dataType.bridgedTo(conversionClass);
+    }
+
+    private static Class<?> loadClass(
+            String className, SerdeContext serdeContext, String explanation) {
+        try {
+            return ClassUtils.getClass(serdeContext.getClassLoader(), className, true);

Review comment:
       Can you avoid using this utility from commons?

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerializer.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes.Field;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * JSON serializer for {@link LogicalType}.
+ *
+ * @see DataTypeJsonDeserializer for the reverse operation
+ */
+@Internal
+public final class DataTypeJsonSerializer extends StdSerializer<DataType> {
+    private static final long serialVersionUID = 1L;
+
+    /*
+    Example generated JSON for
+

Review comment:
       Can you add an example for a DataType not bridged, the ones using only the text representation of the `LogicalType`?

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeCatalogJsonSerdeTest.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.UserDefinedType;
+import org.apache.flink.table.types.utils.DataTypeFactoryMock;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.junit.Test;

Review comment:
       Same, use JUnit 5

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerdeTest.java
##########
@@ -60,20 +53,21 @@
 import org.apache.flink.table.types.logical.TimestampKind;
 import org.apache.flink.table.types.logical.TimestampType;
 import org.apache.flink.table.types.logical.TinyIntType;
-import org.apache.flink.table.types.logical.TypeInformationRawType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.flink.table.types.logical.YearMonthIntervalType;
 import org.apache.flink.table.types.logical.ZonedTimestampType;
-import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.Row;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;

Review comment:
       JUnit 5

##########
File path: flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out
##########
@@ -21,21 +21,7 @@
             }
          },
          "id":1,
-         "outputType":{
-            "type":"ROW",
-            "nullable":true,
-            "fields":[
-               {
-                  "a":"BIGINT"
-               },
-               {
-                  "b":"INT"
-               },
-               {
-                  "c":"VARCHAR(2147483647)"
-               }
-            ]
-         },
+         "outputType":"ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",

Review comment:
       Nice!

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerdeTest.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.planner.calcite.FlinkContextImpl;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link DataType} serialization and deserialization. */
+@RunWith(Parameterized.class)
+public class DataTypeJsonSerdeTest {
+
+    @Parameter public DataType dataType;
+
+    @Test
+    public void testDataTypeSerde() throws IOException {
+        final ObjectMapper mapper = configuredObjectMapper();
+        final String json = toJson(mapper, dataType);
+        final DataType actual = toDataType(mapper, json);
+
+        if (json.contains("children")) {
+            System.out.println();
+        }

Review comment:
       Have you forgot to remove this after an intense debugging session?

##########
File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonSerdeTest.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.planner.calcite.FlinkContextImpl;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.utils.CatalogManagerMocks;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;

Review comment:
       Please use JUnit 5 with `ParametrizedTest` and `MethodSource`. We should not add JUnit 4 tests anymore.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonDeserializer.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_CONVERSION_CLASS;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_ELEMENT_CLASS;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_FIELDS;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_FIELD_NAME;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_KEY_CLASS;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_TYPE;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_VALUE_CLASS;
+
+/**
+ * JSON deserializer for {@link DataType}.
+ *
+ * @see DataTypeJsonSerializer for the reverse operation
+ */
+@Internal
+public class DataTypeJsonDeserializer extends StdDeserializer<DataType> {
+
+    public DataTypeJsonDeserializer() {
+        super(DataType.class);
+    }
+
+    @Override
+    public DataType deserialize(JsonParser jsonParser, DeserializationContext ctx)
+            throws IOException {
+        final JsonNode dataTypeNode = jsonParser.readValueAsTree();
+        final SerdeContext serdeContext = SerdeContext.from(ctx);
+        return deserialize(dataTypeNode, serdeContext);
+    }
+
+    public static DataType deserialize(JsonNode dataTypeNode, SerdeContext serdeContext) {
+        if (dataTypeNode.isTextual()) {

Review comment:
       Ignore this, I see you have it in Serializer

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DataTypeJsonDeserializer.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+
+import org.apache.commons.lang3.ClassUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_CONVERSION_CLASS;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_ELEMENT_CLASS;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_FIELDS;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_FIELD_NAME;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_KEY_CLASS;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_TYPE;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.DataTypeJsonSerializer.FIELD_NAME_VALUE_CLASS;
+
+/**
+ * JSON deserializer for {@link DataType}.
+ *
+ * @see DataTypeJsonSerializer for the reverse operation
+ */
+@Internal
+public class DataTypeJsonDeserializer extends StdDeserializer<DataType> {
+
+    public DataTypeJsonDeserializer() {
+        super(DataType.class);
+    }
+
+    @Override
+    public DataType deserialize(JsonParser jsonParser, DeserializationContext ctx)
+            throws IOException {
+        final JsonNode dataTypeNode = jsonParser.readValueAsTree();
+        final SerdeContext serdeContext = SerdeContext.from(ctx);
+        return deserialize(dataTypeNode, serdeContext);
+    }
+
+    public static DataType deserialize(JsonNode dataTypeNode, SerdeContext serdeContext) {
+        if (dataTypeNode.isTextual()) {

Review comment:
       I see how it works here, but can you please explain it in the javadoc that we have 2 representations?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DataTypeFactory.java
##########
@@ -112,4 +112,23 @@
      * edges of the API.
      */
     <T> DataType createRawDataType(TypeInformation<T> typeInfo);
+
+    // --------------------------------------------------------------------------------------------
+    // LogicalType creation
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * Creates a {@link LogicalType} by a fully or partially defined name.
+     *
+     * <p>The factory will parse and resolve the name of a type to a {@link LogicalType}. This
+     * includes both built-in types and user-defined types (see {@link DistinctType} and {@link
+     * StructuredType}).
+     */
+    LogicalType createLogicalType(String name);

Review comment:
       `name` here is confusing, perhaps `typeString` as in the signature of `LogicalTypeParser` to specify that this is the serialized definition of the type?
   
   Same comment for `DataType createDataType(String name)`

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java
##########
@@ -109,349 +123,358 @@ public void serialize(
             JsonGenerator jsonGenerator,
             SerializerProvider serializerProvider)
             throws IOException {
-        if (logicalType instanceof CharType) {
-            // Zero-length character strings have no serializable string representation.
-            serializeRowType((CharType) logicalType, jsonGenerator);
-        } else if (logicalType instanceof VarCharType) {
-            // Zero-length character strings have no serializable string representation.
-            serializeVarCharType((VarCharType) logicalType, jsonGenerator);
-        } else if (logicalType instanceof BinaryType) {
-            // Zero-length binary strings have no serializable string representation.
-            serializeBinaryType((BinaryType) logicalType, jsonGenerator);
-        } else if (logicalType instanceof VarBinaryType) {
-            // Zero-length binary strings have no serializable string representation.
-            serializeVarBinaryType((VarBinaryType) logicalType, jsonGenerator);
-        } else if (logicalType instanceof SymbolType) {
-            // SymbolType does not support `asSerializableString`
-            serializeSymbolType((SymbolType<?>) logicalType, jsonGenerator);
-        } else if (logicalType instanceof TypeInformationRawType) {
-            // TypeInformationRawType does not support `asSerializableString`
-            serializeTypeInformationRawType((TypeInformationRawType<?>) logicalType, jsonGenerator);
-        } else if (logicalType instanceof StructuredType) {
-            //  StructuredType does not full support `asSerializableString`
-            serializeStructuredType((StructuredType) logicalType, jsonGenerator);
-        } else if (logicalType instanceof DistinctType) {
-            //  DistinctType does not full support `asSerializableString`
-            serializeDistinctType((DistinctType) logicalType, jsonGenerator);
-        } else if (logicalType instanceof TimestampType) {
-            // TimestampType does not consider `TimestampKind`
-            serializeTimestampType((TimestampType) logicalType, jsonGenerator);
-        } else if (logicalType instanceof ZonedTimestampType) {
-            // ZonedTimestampType does not consider `TimestampKind`
-            serializeZonedTimestampType((ZonedTimestampType) logicalType, jsonGenerator);
-        } else if (logicalType instanceof LocalZonedTimestampType) {
-            // LocalZonedTimestampType does not consider `TimestampKind`
-            serializeLocalZonedTimestampType((LocalZonedTimestampType) logicalType, jsonGenerator);
-        } else if (logicalType instanceof RowType) {
-            serializeRowType((RowType) logicalType, jsonGenerator, serializerProvider);
-        } else if (logicalType instanceof MapType) {
-            serializeMapType((MapType) logicalType, jsonGenerator, serializerProvider);
-        } else if (logicalType instanceof ArrayType) {
-            serializeArrayType((ArrayType) logicalType, jsonGenerator, serializerProvider);
-        } else if (logicalType instanceof MultisetType) {
-            serializeMultisetType((MultisetType) logicalType, jsonGenerator, serializerProvider);
-        } else if (logicalType instanceof RawType) {
-            serializeRawType((RawType<?>) logicalType, jsonGenerator, serializerProvider);
-        } else if (logicalType instanceof UnresolvedUserDefinedType) {
-            throw new TableException(
-                    "Can not serialize an UnresolvedUserDefinedType instance. \n"
-                            + "It needs to be resolved into a proper user-defined type.\"");
-        } else {
-            jsonGenerator.writeObject(logicalType.asSerializableString());
-        }
+        final ReadableConfig config = SerdeContext.from(serializerProvider).getConfiguration();
+        final boolean serializeCatalogObjects =
+                !config.get(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS)
+                        .equals(CatalogPlanCompilation.IDENTIFIER);
+        serializeInternal(logicalType, jsonGenerator, serializeCatalogObjects);
     }
 
-    private void serializeRowType(
-            RowType rowType, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
+    private static void serializeInternal(
+            LogicalType logicalType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects)
             throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, rowType.getTypeRoot().name());
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, rowType.isNullable());
-        List<RowType.RowField> fields = rowType.getFields();
-        jsonGenerator.writeArrayFieldStart(FIELD_NAME_FIELDS);
-        for (RowType.RowField rowField : fields) {
-            jsonGenerator.writeStartObject();
-            jsonGenerator.writeFieldName(rowField.getName());
-            serialize(rowField.getType(), jsonGenerator, serializerProvider);
-            if (rowField.getDescription().isPresent()) {
-                jsonGenerator.writeStringField(
-                        FIELD_NAME_DESCRIPTION, rowField.getDescription().get());
-            }
-            jsonGenerator.writeEndObject();
+        if (supportsCompactSerialization(logicalType, serializeCatalogObjects)) {
+            serializeTypeWithCompactSerialization(logicalType, jsonGenerator);
+        } else {
+            // fallback to generic serialization that might still use compact serialization for
+            // individual fields
+            serializeTypeWithGenericSerialization(
+                    logicalType, jsonGenerator, serializeCatalogObjects);
         }
-        jsonGenerator.writeEndArray();
-        jsonGenerator.writeEndObject();
     }
 
-    private void serializeMapType(
-            MapType mapType, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
+    // --------------------------------------------------------------------------------------------
+    // Generic Serialization
+    // --------------------------------------------------------------------------------------------
+
+    private static void serializeTypeWithGenericSerialization(
+            LogicalType logicalType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects)
             throws IOException {
         jsonGenerator.writeStartObject();
-        jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, mapType.getTypeRoot().name());
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, mapType.isNullable());
-        jsonGenerator.writeFieldName(FIELD_NAME_KEY_TYPE);
-        serialize(mapType.getKeyType(), jsonGenerator, serializerProvider);
-        jsonGenerator.writeFieldName(FIELD_NAME_VALUE_TYPE);
-        serialize(mapType.getValueType(), jsonGenerator, serializerProvider);
+
+        jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, logicalType.getTypeRoot().name());
+        if (!logicalType.isNullable()) {
+            jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, false);
+        }
+
+        switch (logicalType.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+            case BINARY:
+            case VARBINARY:
+                serializeZeroLengthString(jsonGenerator);
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                final TimestampType timestampType = (TimestampType) logicalType;
+                serializeTimestamp(
+                        timestampType.getPrecision(), timestampType.getKind(), jsonGenerator);
+                break;
+            case TIMESTAMP_WITH_TIME_ZONE:
+                final ZonedTimestampType zonedTimestampType = (ZonedTimestampType) logicalType;
+                serializeTimestamp(
+                        zonedTimestampType.getPrecision(),
+                        zonedTimestampType.getKind(),
+                        jsonGenerator);
+                break;
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                final LocalZonedTimestampType localZonedTimestampType =
+                        (LocalZonedTimestampType) logicalType;
+                serializeTimestamp(
+                        localZonedTimestampType.getPrecision(),
+                        localZonedTimestampType.getKind(),
+                        jsonGenerator);
+                break;
+            case ARRAY:
+                serializeCollection(
+                        ((ArrayType) logicalType).getElementType(),
+                        jsonGenerator,
+                        serializeCatalogObjects);
+                break;
+            case MULTISET:
+                serializeCollection(
+                        ((MultisetType) logicalType).getElementType(),
+                        jsonGenerator,
+                        serializeCatalogObjects);
+                break;
+            case MAP:
+                serializeMap((MapType) logicalType, jsonGenerator, serializeCatalogObjects);
+                break;
+            case ROW:
+                serializeRow((RowType) logicalType, jsonGenerator, serializeCatalogObjects);
+                break;
+            case DISTINCT_TYPE:
+                serializeDistinctType(
+                        (DistinctType) logicalType, jsonGenerator, serializeCatalogObjects);
+                break;
+            case STRUCTURED_TYPE:
+                serializeStructuredType(
+                        (StructuredType) logicalType, jsonGenerator, serializeCatalogObjects);
+                break;
+            case SYMBOL:
+                // type root is enough
+                break;
+            case RAW:
+                if (logicalType instanceof RawType) {
+                    serializeSpecializedRaw((RawType<?>) logicalType, jsonGenerator);
+                    break;
+                }
+                // fall through
+            default:
+                throw new ValidationException(
+                        String.format(
+                                "Unable to serialize logical type '%s'. Please check the documentation for supported types.",
+                                logicalType.asSummaryString()));
+        }
+
         jsonGenerator.writeEndObject();
     }
 
-    private void serializeArrayType(
-            ArrayType arrayType, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
-            throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, arrayType.getTypeRoot().name());
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, arrayType.isNullable());
-        jsonGenerator.writeFieldName(FIELD_NAME_ELEMENT_TYPE);
-        serialize(arrayType.getElementType(), jsonGenerator, serializerProvider);
-        jsonGenerator.writeEndObject();
+    private static void serializeZeroLengthString(JsonGenerator jsonGenerator) throws IOException {
+        jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0);
     }
 
-    private void serializeMultisetType(
-            MultisetType multisetType,
-            JsonGenerator jsonGenerator,
-            SerializerProvider serializerProvider)
-            throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, multisetType.getTypeRoot().name());
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, multisetType.isNullable());
-        jsonGenerator.writeFieldName(FIELD_NAME_ELEMENT_TYPE);
-        serialize(multisetType.getElementType(), jsonGenerator, serializerProvider);
-        jsonGenerator.writeEndObject();
+    private static void serializeTimestamp(
+            int precision, TimestampKind kind, JsonGenerator jsonGenerator) throws IOException {
+        jsonGenerator.writeNumberField(FIELD_NAME_PRECISION, precision);
+        jsonGenerator.writeObjectField(FIELD_NAME_TIMESTAMP_KIND, kind);
     }
 
-    private void serializeRowType(CharType charType, JsonGenerator jsonGenerator)
+    private static void serializeCollection(
+            LogicalType elementType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects)
             throws IOException {
-        // Zero-length character strings have no serializable string representation.
-        if (charType.getLength() == CharType.EMPTY_LITERAL_LENGTH) {
-            jsonGenerator.writeStartObject();
-            jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, charType.getTypeRoot().name());
-            jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, charType.isNullable());
-            jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0);
-            jsonGenerator.writeEndObject();
-        } else {
-            jsonGenerator.writeObject(charType.asSerializableString());
-        }
+        jsonGenerator.writeFieldName(FIELD_NAME_ELEMENT_TYPE);
+        serializeInternal(elementType, jsonGenerator, serializeCatalogObjects);
     }
 
-    private void serializeVarCharType(VarCharType varCharType, JsonGenerator jsonGenerator)
+    private static void serializeMap(
+            MapType mapType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects)
             throws IOException {
-        // Zero-length character strings have no serializable string representation.
-        if (varCharType.getLength() == VarCharType.EMPTY_LITERAL_LENGTH) {
-            jsonGenerator.writeStartObject();
-            jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, varCharType.getTypeRoot().name());
-            jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, varCharType.isNullable());
-            jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0);
-            jsonGenerator.writeEndObject();
-        } else {
-            jsonGenerator.writeObject(varCharType.asSerializableString());
-        }
+        jsonGenerator.writeFieldName(FIELD_NAME_KEY_TYPE);
+        serializeInternal(mapType.getKeyType(), jsonGenerator, serializeCatalogObjects);
+        jsonGenerator.writeFieldName(FIELD_NAME_VALUE_TYPE);
+        serializeInternal(mapType.getValueType(), jsonGenerator, serializeCatalogObjects);
     }
 
-    private void serializeBinaryType(BinaryType binaryType, JsonGenerator jsonGenerator)
+    private static void serializeRow(
+            RowType rowType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects)
             throws IOException {
-        // Zero-length binary strings have no serializable string representation.
-        if (binaryType.getLength() == BinaryType.EMPTY_LITERAL_LENGTH) {
+        jsonGenerator.writeArrayFieldStart(FIELD_NAME_FIELDS);
+        for (RowType.RowField rowField : rowType.getFields()) {
             jsonGenerator.writeStartObject();
-            jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, binaryType.getTypeRoot().name());
-            jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, binaryType.isNullable());
-            jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0);
+            jsonGenerator.writeStringField(FIELD_NAME_FIELD_NAME, rowField.getName());
+            jsonGenerator.writeFieldName(FIELD_NAME_FIELD_TYPE);
+            serializeInternal(rowField.getType(), jsonGenerator, serializeCatalogObjects);
+            if (rowField.getDescription().isPresent()) {
+                jsonGenerator.writeStringField(
+                        FIELD_NAME_FIELD_DESCRIPTION, rowField.getDescription().get());
+            }
             jsonGenerator.writeEndObject();
-        } else {
-            jsonGenerator.writeObject(binaryType.asSerializableString());
         }
+        jsonGenerator.writeEndArray();
     }
 
-    private void serializeVarBinaryType(VarBinaryType varBinaryType, JsonGenerator jsonGenerator)
+    private static void serializeDistinctType(
+            DistinctType distinctType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects)
             throws IOException {
-        // Zero-length binary strings have no serializable string representation.
-        if (varBinaryType.getLength() == VarBinaryType.EMPTY_LITERAL_LENGTH) {
-            jsonGenerator.writeStartObject();
+        jsonGenerator.writeObjectField(
+                FIELD_NAME_OBJECT_IDENTIFIER,
+                distinctType.getObjectIdentifier().orElseThrow(IllegalStateException::new));
+        if (distinctType.getDescription().isPresent()) {
             jsonGenerator.writeStringField(
-                    FIELD_NAME_TYPE_NAME, varBinaryType.getTypeRoot().name());
-            jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, varBinaryType.isNullable());
-            jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0);
-            jsonGenerator.writeEndObject();
-        } else {
-            jsonGenerator.writeObject(varBinaryType.asSerializableString());
+                    FIELD_NAME_FIELD_DESCRIPTION, distinctType.getDescription().get());
         }
+        jsonGenerator.writeFieldName(FIELD_NAME_SOURCE_TYPE);
+        serializeInternal(distinctType.getSourceType(), jsonGenerator, serializeCatalogObjects);
     }
 
-    private void serializeSymbolType(SymbolType<?> symbolType, JsonGenerator jsonGenerator)
-            throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, symbolType.isNullable());
-        jsonGenerator.writeStringField(
-                FIELD_NAME_SYMBOL_CLASS, symbolType.getDefaultConversion().getName());
-        jsonGenerator.writeEndObject();
-    }
-
-    private void serializeTypeInformationRawType(
-            TypeInformationRawType<?> rawType, JsonGenerator jsonGenerator) throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, rawType.isNullable());
-        jsonGenerator.writeStringField(
-                FIELD_NAME_TYPE_INFO,
-                EncodingUtils.encodeObjectToString(rawType.getTypeInformation()));
-        jsonGenerator.writeEndObject();
-    }
-
-    private void serializeStructuredType(StructuredType structuredType, JsonGenerator jsonGenerator)
+    private static void serializeStructuredType(
+            StructuredType structuredType,
+            JsonGenerator jsonGenerator,
+            boolean serializeCatalogObjects)
             throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeStringField(
-                FIELD_NAME_TYPE_NAME, LogicalTypeRoot.STRUCTURED_TYPE.name());
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, structuredType.isNullable());
         if (structuredType.getObjectIdentifier().isPresent()) {
             jsonGenerator.writeObjectField(
-                    FIELD_NAME_IDENTIFIER, structuredType.getObjectIdentifier().get());
+                    FIELD_NAME_OBJECT_IDENTIFIER, structuredType.getObjectIdentifier().get());
         }
-        if (structuredType.getImplementationClass().isPresent()) {
+        if (structuredType.getDescription().isPresent()) {
             jsonGenerator.writeStringField(
-                    FIELD_NAME_IMPLEMENTATION_CLASS,
-                    structuredType.getImplementationClass().get().getName());
+                    FIELD_NAME_DESCRIPTION, structuredType.getDescription().get());
+        }
+        if (structuredType.getImplementationClass().isPresent()) {
+            jsonGenerator.writeObjectField(
+                    FIELD_NAME_IMPLEMENTATION_CLASS, structuredType.getImplementationClass().get());
         }
         jsonGenerator.writeFieldName(FIELD_NAME_ATTRIBUTES);
         jsonGenerator.writeStartArray();
-        for (StructuredType.StructuredAttribute attribute : structuredType.getAttributes()) {
+        for (StructuredAttribute attribute : structuredType.getAttributes()) {
             jsonGenerator.writeStartObject();
-            jsonGenerator.writeStringField(FIELD_NAME_NAME, attribute.getName());
-            jsonGenerator.writeObjectField(FIELD_NAME_LOGICAL_TYPE, attribute.getType());
+            jsonGenerator.writeStringField(FIELD_NAME_ATTRIBUTE_NAME, attribute.getName());
+            jsonGenerator.writeFieldName(FIELD_NAME_ATTRIBUTE_TYPE);
+            serializeInternal(attribute.getType(), jsonGenerator, serializeCatalogObjects);
             if (attribute.getDescription().isPresent()) {
                 jsonGenerator.writeStringField(
-                        FIELD_NAME_DESCRIPTION, attribute.getDescription().get());
+                        FIELD_NAME_ATTRIBUTE_DESCRIPTION, attribute.getDescription().get());
             }
             jsonGenerator.writeEndObject();
         }
         jsonGenerator.writeEndArray();
-        jsonGenerator.writeBooleanField(FIELD_NAME_FINAL, structuredType.isFinal());
-        jsonGenerator.writeBooleanField(FIELD_NAME_INSTANTIABLE, structuredType.isInstantiable());
-        jsonGenerator.writeStringField(
-                FIELD_NAME_COMPARISON, structuredType.getComparison().name());
-        if (structuredType.getSuperType().isPresent()) {
-            jsonGenerator.writeObjectField(
-                    FIELD_NAME_SUPPER_TYPE, structuredType.getSuperType().get());
+        if (!structuredType.isFinal()) {
+            jsonGenerator.writeBooleanField(FIELD_NAME_FINAL, false);
         }
-        if (structuredType.getDescription().isPresent()) {
+        if (!structuredType.isInstantiable()) {
+            jsonGenerator.writeBooleanField(FIELD_NAME_INSTANTIABLE, false);
+        }
+        if (structuredType.getComparison() != StructuredComparison.NONE) {
             jsonGenerator.writeStringField(
-                    FIELD_NAME_DESCRIPTION, structuredType.getDescription().get());
+                    FIELD_NAME_COMPARISON, structuredType.getComparison().name());
+        }
+        if (structuredType.getSuperType().isPresent()) {
+            jsonGenerator.writeObjectField(
+                    FIELD_NAME_SUPER_TYPE, structuredType.getSuperType().get());
         }
-        jsonGenerator.writeEndObject();
     }
 
-    private void serializeDistinctType(DistinctType distinctType, JsonGenerator jsonGenerator)
+    private static void serializeSpecializedRaw(RawType<?> rawType, JsonGenerator jsonGenerator)
             throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, LogicalTypeRoot.DISTINCT_TYPE.name());
-        Preconditions.checkArgument(distinctType.getObjectIdentifier().isPresent());
-        jsonGenerator.writeObjectField(
-                FIELD_NAME_IDENTIFIER, distinctType.getObjectIdentifier().get());
-        jsonGenerator.writeObjectField(FIELD_NAME_SOURCE_TYPE, distinctType.getSourceType());
-        if (distinctType.getDescription().isPresent()) {
+        jsonGenerator.writeStringField(FIELD_NAME_CLASS, rawType.getOriginatingClass().getName());
+        final TypeSerializer<?> serializer = rawType.getTypeSerializer();
+        if (serializer.equals(NullSerializer.INSTANCE)) {
             jsonGenerator.writeStringField(
-                    FIELD_NAME_DESCRIPTION, distinctType.getDescription().get());
+                    FIELD_NAME_SPECIAL_SERIALIZER, FIELD_VALUE_EXTERNAL_SERIALIZER_NULL);
+        } else if (serializer instanceof ExternalSerializer) {
+            final ExternalSerializer<?, ?> externalSerializer =
+                    (ExternalSerializer<?, ?>) rawType.getTypeSerializer();
+            if (externalSerializer.isInternalInput()) {
+                throw new TableException(
+                        "Asymmetric external serializers are currently not supported. "
+                                + "The input must not be internal if the output is external.");
+            }
+            jsonGenerator.writeObjectField(
+                    FIELD_NAME_EXTERNAL_DATA_TYPE, externalSerializer.getDataType());
+        } else {
+            throw new TableException("Unsupported special case for RAW type.");
         }
-        jsonGenerator.writeEndObject();
     }
 
-    private void serializeTimestampType(TimestampType timestampType, JsonGenerator jsonGenerator)
-            throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, timestampType.getTypeRoot().name());
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, timestampType.isNullable());
-        jsonGenerator.writeNumberField(FIELD_NAME_PRECISION, timestampType.getPrecision());
-        jsonGenerator.writeObjectField(FIELD_NAME_TIMESTAMP_KIND, timestampType.getKind());
-        jsonGenerator.writeEndObject();
-    }
+    // --------------------------------------------------------------------------------------------
+    // Compact Serialization
+    // --------------------------------------------------------------------------------------------
 
-    private void serializeZonedTimestampType(
-            ZonedTimestampType timestampType, JsonGenerator jsonGenerator) throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, timestampType.getTypeRoot().name());
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, timestampType.isNullable());
-        jsonGenerator.writeNumberField(FIELD_NAME_PRECISION, timestampType.getPrecision());
-        jsonGenerator.writeObjectField(FIELD_NAME_TIMESTAMP_KIND, timestampType.getKind());
-        jsonGenerator.writeEndObject();
+    private static boolean supportsCompactSerialization(
+            LogicalType logicalType, boolean serializeCatalogObjects) {
+        return logicalType.accept(new CompactSerializationChecker(serializeCatalogObjects));
     }
 
-    private void serializeLocalZonedTimestampType(
-            LocalZonedTimestampType timestampType, JsonGenerator jsonGenerator) throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, timestampType.getTypeRoot().name());
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, timestampType.isNullable());
-        jsonGenerator.writeNumberField(FIELD_NAME_PRECISION, timestampType.getPrecision());
-        jsonGenerator.writeObjectField(FIELD_NAME_TIMESTAMP_KIND, timestampType.getKind());
-        jsonGenerator.writeEndObject();
+    private static void serializeTypeWithCompactSerialization(
+            LogicalType logicalType, JsonGenerator jsonGenerator) throws IOException {
+        final String compactString = logicalType.asSerializableString();
+        jsonGenerator.writeString(compactString);
     }
 
-    @SuppressWarnings("rawtypes")
-    private void serializeRawType(
-            RawType<?> rawType, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
-            throws IOException {
-        TypeSerializer<?> typeSer = rawType.getTypeSerializer();
-        if (typeSer instanceof ExternalSerializer) {
-            ExternalSerializer externalSer = (ExternalSerializer) typeSer;
-            // Currently, ExternalSerializer with `isInternalInput=false` will be serialized,
-            // Once `isInternalInput=true` needs to be serialized, we can add individual field in
-            // the json to support it, and the new json plan is compatible with the previous one.
-            if (externalSer.isInternalInput()) {
-                throw new TableException(
-                        "ExternalSerializer with `isInternalInput=true` is not supported.");
-            }
-            DataType dataType = externalSer.getDataType();
-            boolean isMapView = DataViewUtils.isMapViewDataType(dataType);
-            boolean isListView = DataViewUtils.isListViewDataType(dataType);
-            if (isMapView || isListView) {
-                jsonGenerator.writeStartObject();
-                jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, LogicalTypeRoot.RAW.name());
-                jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, rawType.isNullable());
-                if (isMapView) {
-                    jsonGenerator.writeStringField(
-                            FIELD_NAME_DATA_VIEW_CLASS, MapView.class.getName());
-                    KeyValueDataType keyValueDataType =
-                            DataViewUtils.extractKeyValueDataTypeForMapView(dataType);
-                    serializeDataTypeForDataView(
-                            FIELD_NAME_KEY_TYPE,
-                            keyValueDataType.getKeyDataType(),
-                            jsonGenerator,
-                            serializerProvider);
-                    serializeDataTypeForDataView(
-                            FIELD_NAME_VALUE_TYPE,
-                            keyValueDataType.getValueDataType(),
-                            jsonGenerator,
-                            serializerProvider);
-                } else {
-                    jsonGenerator.writeStringField(
-                            FIELD_NAME_DATA_VIEW_CLASS, ListView.class.getName());
-                    DataType elementType =
-                            DataViewUtils.extractElementDataTypeForListView(dataType);
-                    serializeDataTypeForDataView(
-                            FIELD_NAME_ELEMENT_TYPE,
-                            elementType,
-                            jsonGenerator,
-                            serializerProvider);
-                }
-                jsonGenerator.writeEndObject();
-                return;
-            }
+    /**
+     * Checks whether the given type can be serialized as a compact string created from {@link
+     * LogicalType#asSerializableString()}.
+     */
+    private static class CompactSerializationChecker extends LogicalTypeDefaultVisitor<Boolean> {
+
+        private final boolean serializeCatalogObjects;
+
+        CompactSerializationChecker(boolean serializeCatalogObjects) {
+            this.serializeCatalogObjects = serializeCatalogObjects;
         }
 
-        jsonGenerator.writeObject(rawType.asSerializableString());
-    }
+        @Override
+        public Boolean visit(CharType charType) {
+            return charType.getLength() > 0;
+        }
 
-    private void serializeDataTypeForDataView(
-            String key,
-            DataType dataType,
-            JsonGenerator jsonGenerator,
-            SerializerProvider serializerProvider)
-            throws IOException {
-        jsonGenerator.writeFieldName(key);
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeBooleanField(
-                FIELD_NAME_IS_INTERNAL_TYPE, DataTypeUtils.isInternal(dataType));
-        jsonGenerator.writeFieldName(FIELD_NAME_TYPE_NAME);
-        LogicalType logicalType = LogicalTypeDataTypeConverter.toLogicalType(dataType);
-        serialize(logicalType, jsonGenerator, serializerProvider);
-        jsonGenerator.writeEndObject();
+        @Override
+        public Boolean visit(VarCharType varCharType) {
+            return varCharType.getLength() > 0;
+        }
+
+        @Override
+        public Boolean visit(BinaryType binaryType) {
+            return binaryType.getLength() > 0;
+        }
+
+        @Override
+        public Boolean visit(VarBinaryType varBinaryType) {
+            return varBinaryType.getLength() > 0;
+        }
+
+        @Override
+        public Boolean visit(TimestampType timestampType) {
+            return timestampType.getKind() == TimestampKind.REGULAR;
+        }
+
+        @Override
+        public Boolean visit(ZonedTimestampType zonedTimestampType) {
+            return zonedTimestampType.getKind() == TimestampKind.REGULAR;
+        }
+
+        @Override
+        public Boolean visit(LocalZonedTimestampType localZonedTimestampType) {
+            return localZonedTimestampType.getKind() == TimestampKind.REGULAR;
+        }
+
+        @Override
+        public Boolean visit(DistinctType distinctType) {
+            // catalog-based distinct types are always string serializable,
+            // however, depending on the configuration, we serialize the entire type
+            return !serializeCatalogObjects;
+        }
+
+        @Override
+        public Boolean visit(StructuredType structuredType) {
+            // catalog-based structured types are always string serializable,
+            // however, depending on the configuration, we serialize the entire type
+            return structuredType.getObjectIdentifier().isPresent() && !serializeCatalogObjects;

Review comment:
       In case the structured type doesn't have an object identifier, aren't we able to compat serialize structured types which children are compat serializable? In the same fashion we do for row.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java
##########
@@ -109,349 +123,358 @@ public void serialize(
             JsonGenerator jsonGenerator,
             SerializerProvider serializerProvider)
             throws IOException {
-        if (logicalType instanceof CharType) {
-            // Zero-length character strings have no serializable string representation.
-            serializeRowType((CharType) logicalType, jsonGenerator);
-        } else if (logicalType instanceof VarCharType) {
-            // Zero-length character strings have no serializable string representation.
-            serializeVarCharType((VarCharType) logicalType, jsonGenerator);
-        } else if (logicalType instanceof BinaryType) {
-            // Zero-length binary strings have no serializable string representation.
-            serializeBinaryType((BinaryType) logicalType, jsonGenerator);
-        } else if (logicalType instanceof VarBinaryType) {
-            // Zero-length binary strings have no serializable string representation.
-            serializeVarBinaryType((VarBinaryType) logicalType, jsonGenerator);
-        } else if (logicalType instanceof SymbolType) {
-            // SymbolType does not support `asSerializableString`
-            serializeSymbolType((SymbolType<?>) logicalType, jsonGenerator);
-        } else if (logicalType instanceof TypeInformationRawType) {
-            // TypeInformationRawType does not support `asSerializableString`
-            serializeTypeInformationRawType((TypeInformationRawType<?>) logicalType, jsonGenerator);
-        } else if (logicalType instanceof StructuredType) {
-            //  StructuredType does not full support `asSerializableString`
-            serializeStructuredType((StructuredType) logicalType, jsonGenerator);
-        } else if (logicalType instanceof DistinctType) {
-            //  DistinctType does not full support `asSerializableString`
-            serializeDistinctType((DistinctType) logicalType, jsonGenerator);
-        } else if (logicalType instanceof TimestampType) {
-            // TimestampType does not consider `TimestampKind`
-            serializeTimestampType((TimestampType) logicalType, jsonGenerator);
-        } else if (logicalType instanceof ZonedTimestampType) {
-            // ZonedTimestampType does not consider `TimestampKind`
-            serializeZonedTimestampType((ZonedTimestampType) logicalType, jsonGenerator);
-        } else if (logicalType instanceof LocalZonedTimestampType) {
-            // LocalZonedTimestampType does not consider `TimestampKind`
-            serializeLocalZonedTimestampType((LocalZonedTimestampType) logicalType, jsonGenerator);
-        } else if (logicalType instanceof RowType) {
-            serializeRowType((RowType) logicalType, jsonGenerator, serializerProvider);
-        } else if (logicalType instanceof MapType) {
-            serializeMapType((MapType) logicalType, jsonGenerator, serializerProvider);
-        } else if (logicalType instanceof ArrayType) {
-            serializeArrayType((ArrayType) logicalType, jsonGenerator, serializerProvider);
-        } else if (logicalType instanceof MultisetType) {
-            serializeMultisetType((MultisetType) logicalType, jsonGenerator, serializerProvider);
-        } else if (logicalType instanceof RawType) {
-            serializeRawType((RawType<?>) logicalType, jsonGenerator, serializerProvider);
-        } else if (logicalType instanceof UnresolvedUserDefinedType) {
-            throw new TableException(
-                    "Can not serialize an UnresolvedUserDefinedType instance. \n"
-                            + "It needs to be resolved into a proper user-defined type.\"");
-        } else {
-            jsonGenerator.writeObject(logicalType.asSerializableString());
-        }
+        final ReadableConfig config = SerdeContext.from(serializerProvider).getConfiguration();
+        final boolean serializeCatalogObjects =
+                !config.get(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS)
+                        .equals(CatalogPlanCompilation.IDENTIFIER);
+        serializeInternal(logicalType, jsonGenerator, serializeCatalogObjects);
     }
 
-    private void serializeRowType(
-            RowType rowType, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
+    private static void serializeInternal(
+            LogicalType logicalType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects)
             throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, rowType.getTypeRoot().name());
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, rowType.isNullable());
-        List<RowType.RowField> fields = rowType.getFields();
-        jsonGenerator.writeArrayFieldStart(FIELD_NAME_FIELDS);
-        for (RowType.RowField rowField : fields) {
-            jsonGenerator.writeStartObject();
-            jsonGenerator.writeFieldName(rowField.getName());
-            serialize(rowField.getType(), jsonGenerator, serializerProvider);
-            if (rowField.getDescription().isPresent()) {
-                jsonGenerator.writeStringField(
-                        FIELD_NAME_DESCRIPTION, rowField.getDescription().get());
-            }
-            jsonGenerator.writeEndObject();
+        if (supportsCompactSerialization(logicalType, serializeCatalogObjects)) {
+            serializeTypeWithCompactSerialization(logicalType, jsonGenerator);
+        } else {
+            // fallback to generic serialization that might still use compact serialization for
+            // individual fields
+            serializeTypeWithGenericSerialization(
+                    logicalType, jsonGenerator, serializeCatalogObjects);
         }
-        jsonGenerator.writeEndArray();
-        jsonGenerator.writeEndObject();
     }
 
-    private void serializeMapType(
-            MapType mapType, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
+    // --------------------------------------------------------------------------------------------
+    // Generic Serialization
+    // --------------------------------------------------------------------------------------------
+
+    private static void serializeTypeWithGenericSerialization(
+            LogicalType logicalType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects)
             throws IOException {
         jsonGenerator.writeStartObject();
-        jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, mapType.getTypeRoot().name());
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, mapType.isNullable());
-        jsonGenerator.writeFieldName(FIELD_NAME_KEY_TYPE);
-        serialize(mapType.getKeyType(), jsonGenerator, serializerProvider);
-        jsonGenerator.writeFieldName(FIELD_NAME_VALUE_TYPE);
-        serialize(mapType.getValueType(), jsonGenerator, serializerProvider);
+
+        jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, logicalType.getTypeRoot().name());
+        if (!logicalType.isNullable()) {
+            jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, false);
+        }
+
+        switch (logicalType.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+            case BINARY:
+            case VARBINARY:
+                serializeZeroLengthString(jsonGenerator);
+                break;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+                final TimestampType timestampType = (TimestampType) logicalType;
+                serializeTimestamp(
+                        timestampType.getPrecision(), timestampType.getKind(), jsonGenerator);
+                break;
+            case TIMESTAMP_WITH_TIME_ZONE:
+                final ZonedTimestampType zonedTimestampType = (ZonedTimestampType) logicalType;
+                serializeTimestamp(
+                        zonedTimestampType.getPrecision(),
+                        zonedTimestampType.getKind(),
+                        jsonGenerator);
+                break;
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                final LocalZonedTimestampType localZonedTimestampType =
+                        (LocalZonedTimestampType) logicalType;
+                serializeTimestamp(
+                        localZonedTimestampType.getPrecision(),
+                        localZonedTimestampType.getKind(),
+                        jsonGenerator);
+                break;
+            case ARRAY:
+                serializeCollection(
+                        ((ArrayType) logicalType).getElementType(),
+                        jsonGenerator,
+                        serializeCatalogObjects);
+                break;
+            case MULTISET:
+                serializeCollection(
+                        ((MultisetType) logicalType).getElementType(),
+                        jsonGenerator,
+                        serializeCatalogObjects);
+                break;
+            case MAP:
+                serializeMap((MapType) logicalType, jsonGenerator, serializeCatalogObjects);
+                break;
+            case ROW:
+                serializeRow((RowType) logicalType, jsonGenerator, serializeCatalogObjects);
+                break;
+            case DISTINCT_TYPE:
+                serializeDistinctType(
+                        (DistinctType) logicalType, jsonGenerator, serializeCatalogObjects);
+                break;
+            case STRUCTURED_TYPE:
+                serializeStructuredType(
+                        (StructuredType) logicalType, jsonGenerator, serializeCatalogObjects);
+                break;
+            case SYMBOL:
+                // type root is enough
+                break;
+            case RAW:
+                if (logicalType instanceof RawType) {
+                    serializeSpecializedRaw((RawType<?>) logicalType, jsonGenerator);
+                    break;
+                }
+                // fall through
+            default:
+                throw new ValidationException(
+                        String.format(
+                                "Unable to serialize logical type '%s'. Please check the documentation for supported types.",
+                                logicalType.asSummaryString()));
+        }
+
         jsonGenerator.writeEndObject();
     }
 
-    private void serializeArrayType(
-            ArrayType arrayType, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
-            throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, arrayType.getTypeRoot().name());
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, arrayType.isNullable());
-        jsonGenerator.writeFieldName(FIELD_NAME_ELEMENT_TYPE);
-        serialize(arrayType.getElementType(), jsonGenerator, serializerProvider);
-        jsonGenerator.writeEndObject();
+    private static void serializeZeroLengthString(JsonGenerator jsonGenerator) throws IOException {
+        jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0);
     }
 
-    private void serializeMultisetType(
-            MultisetType multisetType,
-            JsonGenerator jsonGenerator,
-            SerializerProvider serializerProvider)
-            throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, multisetType.getTypeRoot().name());
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, multisetType.isNullable());
-        jsonGenerator.writeFieldName(FIELD_NAME_ELEMENT_TYPE);
-        serialize(multisetType.getElementType(), jsonGenerator, serializerProvider);
-        jsonGenerator.writeEndObject();
+    private static void serializeTimestamp(
+            int precision, TimestampKind kind, JsonGenerator jsonGenerator) throws IOException {
+        jsonGenerator.writeNumberField(FIELD_NAME_PRECISION, precision);
+        jsonGenerator.writeObjectField(FIELD_NAME_TIMESTAMP_KIND, kind);
     }
 
-    private void serializeRowType(CharType charType, JsonGenerator jsonGenerator)
+    private static void serializeCollection(
+            LogicalType elementType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects)
             throws IOException {
-        // Zero-length character strings have no serializable string representation.
-        if (charType.getLength() == CharType.EMPTY_LITERAL_LENGTH) {
-            jsonGenerator.writeStartObject();
-            jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, charType.getTypeRoot().name());
-            jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, charType.isNullable());
-            jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0);
-            jsonGenerator.writeEndObject();
-        } else {
-            jsonGenerator.writeObject(charType.asSerializableString());
-        }
+        jsonGenerator.writeFieldName(FIELD_NAME_ELEMENT_TYPE);
+        serializeInternal(elementType, jsonGenerator, serializeCatalogObjects);
     }
 
-    private void serializeVarCharType(VarCharType varCharType, JsonGenerator jsonGenerator)
+    private static void serializeMap(
+            MapType mapType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects)
             throws IOException {
-        // Zero-length character strings have no serializable string representation.
-        if (varCharType.getLength() == VarCharType.EMPTY_LITERAL_LENGTH) {
-            jsonGenerator.writeStartObject();
-            jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, varCharType.getTypeRoot().name());
-            jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, varCharType.isNullable());
-            jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0);
-            jsonGenerator.writeEndObject();
-        } else {
-            jsonGenerator.writeObject(varCharType.asSerializableString());
-        }
+        jsonGenerator.writeFieldName(FIELD_NAME_KEY_TYPE);
+        serializeInternal(mapType.getKeyType(), jsonGenerator, serializeCatalogObjects);
+        jsonGenerator.writeFieldName(FIELD_NAME_VALUE_TYPE);
+        serializeInternal(mapType.getValueType(), jsonGenerator, serializeCatalogObjects);
     }
 
-    private void serializeBinaryType(BinaryType binaryType, JsonGenerator jsonGenerator)
+    private static void serializeRow(
+            RowType rowType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects)
             throws IOException {
-        // Zero-length binary strings have no serializable string representation.
-        if (binaryType.getLength() == BinaryType.EMPTY_LITERAL_LENGTH) {
+        jsonGenerator.writeArrayFieldStart(FIELD_NAME_FIELDS);
+        for (RowType.RowField rowField : rowType.getFields()) {
             jsonGenerator.writeStartObject();
-            jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, binaryType.getTypeRoot().name());
-            jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, binaryType.isNullable());
-            jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0);
+            jsonGenerator.writeStringField(FIELD_NAME_FIELD_NAME, rowField.getName());
+            jsonGenerator.writeFieldName(FIELD_NAME_FIELD_TYPE);
+            serializeInternal(rowField.getType(), jsonGenerator, serializeCatalogObjects);
+            if (rowField.getDescription().isPresent()) {
+                jsonGenerator.writeStringField(
+                        FIELD_NAME_FIELD_DESCRIPTION, rowField.getDescription().get());
+            }
             jsonGenerator.writeEndObject();
-        } else {
-            jsonGenerator.writeObject(binaryType.asSerializableString());
         }
+        jsonGenerator.writeEndArray();
     }
 
-    private void serializeVarBinaryType(VarBinaryType varBinaryType, JsonGenerator jsonGenerator)
+    private static void serializeDistinctType(
+            DistinctType distinctType, JsonGenerator jsonGenerator, boolean serializeCatalogObjects)
             throws IOException {
-        // Zero-length binary strings have no serializable string representation.
-        if (varBinaryType.getLength() == VarBinaryType.EMPTY_LITERAL_LENGTH) {
-            jsonGenerator.writeStartObject();
+        jsonGenerator.writeObjectField(
+                FIELD_NAME_OBJECT_IDENTIFIER,
+                distinctType.getObjectIdentifier().orElseThrow(IllegalStateException::new));
+        if (distinctType.getDescription().isPresent()) {
             jsonGenerator.writeStringField(
-                    FIELD_NAME_TYPE_NAME, varBinaryType.getTypeRoot().name());
-            jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, varBinaryType.isNullable());
-            jsonGenerator.writeNumberField(FIELD_NAME_LENGTH, 0);
-            jsonGenerator.writeEndObject();
-        } else {
-            jsonGenerator.writeObject(varBinaryType.asSerializableString());
+                    FIELD_NAME_FIELD_DESCRIPTION, distinctType.getDescription().get());
         }
+        jsonGenerator.writeFieldName(FIELD_NAME_SOURCE_TYPE);
+        serializeInternal(distinctType.getSourceType(), jsonGenerator, serializeCatalogObjects);
     }
 
-    private void serializeSymbolType(SymbolType<?> symbolType, JsonGenerator jsonGenerator)
-            throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, symbolType.isNullable());
-        jsonGenerator.writeStringField(
-                FIELD_NAME_SYMBOL_CLASS, symbolType.getDefaultConversion().getName());
-        jsonGenerator.writeEndObject();
-    }
-
-    private void serializeTypeInformationRawType(
-            TypeInformationRawType<?> rawType, JsonGenerator jsonGenerator) throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, rawType.isNullable());
-        jsonGenerator.writeStringField(
-                FIELD_NAME_TYPE_INFO,
-                EncodingUtils.encodeObjectToString(rawType.getTypeInformation()));
-        jsonGenerator.writeEndObject();
-    }
-
-    private void serializeStructuredType(StructuredType structuredType, JsonGenerator jsonGenerator)
+    private static void serializeStructuredType(
+            StructuredType structuredType,
+            JsonGenerator jsonGenerator,
+            boolean serializeCatalogObjects)
             throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeStringField(
-                FIELD_NAME_TYPE_NAME, LogicalTypeRoot.STRUCTURED_TYPE.name());
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, structuredType.isNullable());
         if (structuredType.getObjectIdentifier().isPresent()) {
             jsonGenerator.writeObjectField(
-                    FIELD_NAME_IDENTIFIER, structuredType.getObjectIdentifier().get());
+                    FIELD_NAME_OBJECT_IDENTIFIER, structuredType.getObjectIdentifier().get());
         }
-        if (structuredType.getImplementationClass().isPresent()) {
+        if (structuredType.getDescription().isPresent()) {
             jsonGenerator.writeStringField(
-                    FIELD_NAME_IMPLEMENTATION_CLASS,
-                    structuredType.getImplementationClass().get().getName());
+                    FIELD_NAME_DESCRIPTION, structuredType.getDescription().get());
+        }
+        if (structuredType.getImplementationClass().isPresent()) {
+            jsonGenerator.writeObjectField(
+                    FIELD_NAME_IMPLEMENTATION_CLASS, structuredType.getImplementationClass().get());
         }
         jsonGenerator.writeFieldName(FIELD_NAME_ATTRIBUTES);
         jsonGenerator.writeStartArray();
-        for (StructuredType.StructuredAttribute attribute : structuredType.getAttributes()) {
+        for (StructuredAttribute attribute : structuredType.getAttributes()) {
             jsonGenerator.writeStartObject();
-            jsonGenerator.writeStringField(FIELD_NAME_NAME, attribute.getName());
-            jsonGenerator.writeObjectField(FIELD_NAME_LOGICAL_TYPE, attribute.getType());
+            jsonGenerator.writeStringField(FIELD_NAME_ATTRIBUTE_NAME, attribute.getName());
+            jsonGenerator.writeFieldName(FIELD_NAME_ATTRIBUTE_TYPE);
+            serializeInternal(attribute.getType(), jsonGenerator, serializeCatalogObjects);
             if (attribute.getDescription().isPresent()) {
                 jsonGenerator.writeStringField(
-                        FIELD_NAME_DESCRIPTION, attribute.getDescription().get());
+                        FIELD_NAME_ATTRIBUTE_DESCRIPTION, attribute.getDescription().get());
             }
             jsonGenerator.writeEndObject();
         }
         jsonGenerator.writeEndArray();
-        jsonGenerator.writeBooleanField(FIELD_NAME_FINAL, structuredType.isFinal());
-        jsonGenerator.writeBooleanField(FIELD_NAME_INSTANTIABLE, structuredType.isInstantiable());
-        jsonGenerator.writeStringField(
-                FIELD_NAME_COMPARISON, structuredType.getComparison().name());
-        if (structuredType.getSuperType().isPresent()) {
-            jsonGenerator.writeObjectField(
-                    FIELD_NAME_SUPPER_TYPE, structuredType.getSuperType().get());
+        if (!structuredType.isFinal()) {
+            jsonGenerator.writeBooleanField(FIELD_NAME_FINAL, false);
         }
-        if (structuredType.getDescription().isPresent()) {
+        if (!structuredType.isInstantiable()) {
+            jsonGenerator.writeBooleanField(FIELD_NAME_INSTANTIABLE, false);
+        }
+        if (structuredType.getComparison() != StructuredComparison.NONE) {
             jsonGenerator.writeStringField(
-                    FIELD_NAME_DESCRIPTION, structuredType.getDescription().get());
+                    FIELD_NAME_COMPARISON, structuredType.getComparison().name());
+        }
+        if (structuredType.getSuperType().isPresent()) {
+            jsonGenerator.writeObjectField(
+                    FIELD_NAME_SUPER_TYPE, structuredType.getSuperType().get());
         }
-        jsonGenerator.writeEndObject();
     }
 
-    private void serializeDistinctType(DistinctType distinctType, JsonGenerator jsonGenerator)
+    private static void serializeSpecializedRaw(RawType<?> rawType, JsonGenerator jsonGenerator)
             throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, LogicalTypeRoot.DISTINCT_TYPE.name());
-        Preconditions.checkArgument(distinctType.getObjectIdentifier().isPresent());
-        jsonGenerator.writeObjectField(
-                FIELD_NAME_IDENTIFIER, distinctType.getObjectIdentifier().get());
-        jsonGenerator.writeObjectField(FIELD_NAME_SOURCE_TYPE, distinctType.getSourceType());
-        if (distinctType.getDescription().isPresent()) {
+        jsonGenerator.writeStringField(FIELD_NAME_CLASS, rawType.getOriginatingClass().getName());
+        final TypeSerializer<?> serializer = rawType.getTypeSerializer();
+        if (serializer.equals(NullSerializer.INSTANCE)) {
             jsonGenerator.writeStringField(
-                    FIELD_NAME_DESCRIPTION, distinctType.getDescription().get());
+                    FIELD_NAME_SPECIAL_SERIALIZER, FIELD_VALUE_EXTERNAL_SERIALIZER_NULL);
+        } else if (serializer instanceof ExternalSerializer) {
+            final ExternalSerializer<?, ?> externalSerializer =
+                    (ExternalSerializer<?, ?>) rawType.getTypeSerializer();
+            if (externalSerializer.isInternalInput()) {
+                throw new TableException(
+                        "Asymmetric external serializers are currently not supported. "
+                                + "The input must not be internal if the output is external.");
+            }
+            jsonGenerator.writeObjectField(
+                    FIELD_NAME_EXTERNAL_DATA_TYPE, externalSerializer.getDataType());
+        } else {
+            throw new TableException("Unsupported special case for RAW type.");
         }
-        jsonGenerator.writeEndObject();
     }
 
-    private void serializeTimestampType(TimestampType timestampType, JsonGenerator jsonGenerator)
-            throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, timestampType.getTypeRoot().name());
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, timestampType.isNullable());
-        jsonGenerator.writeNumberField(FIELD_NAME_PRECISION, timestampType.getPrecision());
-        jsonGenerator.writeObjectField(FIELD_NAME_TIMESTAMP_KIND, timestampType.getKind());
-        jsonGenerator.writeEndObject();
-    }
+    // --------------------------------------------------------------------------------------------
+    // Compact Serialization
+    // --------------------------------------------------------------------------------------------
 
-    private void serializeZonedTimestampType(
-            ZonedTimestampType timestampType, JsonGenerator jsonGenerator) throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, timestampType.getTypeRoot().name());
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, timestampType.isNullable());
-        jsonGenerator.writeNumberField(FIELD_NAME_PRECISION, timestampType.getPrecision());
-        jsonGenerator.writeObjectField(FIELD_NAME_TIMESTAMP_KIND, timestampType.getKind());
-        jsonGenerator.writeEndObject();
+    private static boolean supportsCompactSerialization(
+            LogicalType logicalType, boolean serializeCatalogObjects) {
+        return logicalType.accept(new CompactSerializationChecker(serializeCatalogObjects));
     }
 
-    private void serializeLocalZonedTimestampType(
-            LocalZonedTimestampType timestampType, JsonGenerator jsonGenerator) throws IOException {
-        jsonGenerator.writeStartObject();
-        jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, timestampType.getTypeRoot().name());
-        jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, timestampType.isNullable());
-        jsonGenerator.writeNumberField(FIELD_NAME_PRECISION, timestampType.getPrecision());
-        jsonGenerator.writeObjectField(FIELD_NAME_TIMESTAMP_KIND, timestampType.getKind());
-        jsonGenerator.writeEndObject();
+    private static void serializeTypeWithCompactSerialization(
+            LogicalType logicalType, JsonGenerator jsonGenerator) throws IOException {
+        final String compactString = logicalType.asSerializableString();
+        jsonGenerator.writeString(compactString);
     }
 
-    @SuppressWarnings("rawtypes")
-    private void serializeRawType(
-            RawType<?> rawType, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
-            throws IOException {
-        TypeSerializer<?> typeSer = rawType.getTypeSerializer();
-        if (typeSer instanceof ExternalSerializer) {
-            ExternalSerializer externalSer = (ExternalSerializer) typeSer;
-            // Currently, ExternalSerializer with `isInternalInput=false` will be serialized,
-            // Once `isInternalInput=true` needs to be serialized, we can add individual field in
-            // the json to support it, and the new json plan is compatible with the previous one.
-            if (externalSer.isInternalInput()) {
-                throw new TableException(
-                        "ExternalSerializer with `isInternalInput=true` is not supported.");
-            }
-            DataType dataType = externalSer.getDataType();
-            boolean isMapView = DataViewUtils.isMapViewDataType(dataType);
-            boolean isListView = DataViewUtils.isListViewDataType(dataType);
-            if (isMapView || isListView) {
-                jsonGenerator.writeStartObject();
-                jsonGenerator.writeStringField(FIELD_NAME_TYPE_NAME, LogicalTypeRoot.RAW.name());
-                jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, rawType.isNullable());
-                if (isMapView) {
-                    jsonGenerator.writeStringField(
-                            FIELD_NAME_DATA_VIEW_CLASS, MapView.class.getName());
-                    KeyValueDataType keyValueDataType =
-                            DataViewUtils.extractKeyValueDataTypeForMapView(dataType);
-                    serializeDataTypeForDataView(
-                            FIELD_NAME_KEY_TYPE,
-                            keyValueDataType.getKeyDataType(),
-                            jsonGenerator,
-                            serializerProvider);
-                    serializeDataTypeForDataView(
-                            FIELD_NAME_VALUE_TYPE,
-                            keyValueDataType.getValueDataType(),
-                            jsonGenerator,
-                            serializerProvider);
-                } else {
-                    jsonGenerator.writeStringField(
-                            FIELD_NAME_DATA_VIEW_CLASS, ListView.class.getName());
-                    DataType elementType =
-                            DataViewUtils.extractElementDataTypeForListView(dataType);
-                    serializeDataTypeForDataView(
-                            FIELD_NAME_ELEMENT_TYPE,
-                            elementType,
-                            jsonGenerator,
-                            serializerProvider);
-                }
-                jsonGenerator.writeEndObject();
-                return;
-            }
+    /**
+     * Checks whether the given type can be serialized as a compact string created from {@link
+     * LogicalType#asSerializableString()}.
+     */
+    private static class CompactSerializationChecker extends LogicalTypeDefaultVisitor<Boolean> {
+
+        private final boolean serializeCatalogObjects;
+
+        CompactSerializationChecker(boolean serializeCatalogObjects) {
+            this.serializeCatalogObjects = serializeCatalogObjects;
         }
 
-        jsonGenerator.writeObject(rawType.asSerializableString());
-    }
+        @Override
+        public Boolean visit(CharType charType) {
+            return charType.getLength() > 0;

Review comment:
       Isn't this always true because CharType of length 0 is invalid?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org