You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/24 14:31:13 UTC

[GitHub] [flink] twalthr commented on a change in pull request #18427: [FLINK-25386][table] Harden table persisted plan

twalthr commented on a change in pull request #18427:
URL: https://github.com/apache/flink/pull/18427#discussion_r790700555



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
##########
@@ -80,7 +80,25 @@
     @PublicEvolving
     interface Context {
 
-        /** Returns the identifier of the table in the {@link Catalog}. */
+        /**
+         * Returns the identifier of the table in the {@link Catalog}.
+         *
+         * <p>This identifier defines the relationship between the table instance and the associated
+         * {@link Catalog} (if any), but it doesn't uniquely identify this specific table
+         * setup/instance across a table program, as the same table might be stored in different
+         * catalogs or, in case of anonymous tables, this identifier is auto-generated
+         * non-deterministic. Because of that behaviour, We strongly suggest using this identifier

Review comment:
       `behaviour, We`
   
   Similar comment as before: Try to split long sentences. It is very uncommon in English to have long nested sentences:
   
   ```
   This identifier defines the relationship between the table instance and the associated {@link Catalog} (if any). However, it doesn't uniquely identify this specific table setup/instance across a table program. The same table might be stored in different catalogs or, in case of anonymous tables, this identifier is auto-generated and non-deterministic.
   ```

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/RexNodeExpression.java
##########
@@ -108,4 +109,23 @@ public DataType getOutputDataType() {
     public String toString() {
         return asSummaryString();
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        RexNodeExpression that = (RexNodeExpression) o;
+        return Objects.equals(rexNode, that.rexNode)

Review comment:
       nit: `rexNode` and `outputDataType` cannot be null and could use `equals` directly 

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/AggregatePushDownSpec.java
##########
@@ -204,4 +205,26 @@ public static boolean apply(
         }
         return aggExpressions;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {

Review comment:
       mark the class as `final` and other classes of this commit as `final`?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/DynamicTableFactory.java
##########
@@ -80,7 +80,25 @@
     @PublicEvolving
     interface Context {
 
-        /** Returns the identifier of the table in the {@link Catalog}. */
+        /**
+         * Returns the identifier of the table in the {@link Catalog}.
+         *
+         * <p>This identifier defines the relationship between the table instance and the associated
+         * {@link Catalog} (if any), but it doesn't uniquely identify this specific table
+         * setup/instance across a table program, as the same table might be stored in different
+         * catalogs or, in case of anonymous tables, this identifier is auto-generated
+         * non-deterministic. Because of that behaviour, We strongly suggest using this identifier
+         * only for debugging purpose, and rely on user input for uniquely identifying a "table
+         * instance".
+         *
+         * <p>For example, when implementing a Kafka source using consumer groups, the user should
+         * provide the consumer group id manually rather than using this identifier as the consumer
+         * group id, so the offset tracking remains stable even if this table is anonymous, or it's
+         * moved to another {@link Catalog}.
+         *
+         * <p>Note that for anonymous tables {@link ObjectIdentifier#asSerializableString()} will
+         * fail, so we suggest to use {@link ObjectIdentifier#asSummaryString()} for debugging.

Review comment:
       `debugging` -> `printing and logging`

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonSerializer.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.catalog.Column;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.serializeOptionalField;
+
+class ColumnJsonSerializer extends StdSerializer<Column> {
+
+    public static final String COLUMN_TYPE = "type";
+    public static final String COLUMN_TYPE_PHYSICAL = "physical";
+    public static final String COLUMN_TYPE_COMPUTED = "computed";
+    public static final String COLUMN_TYPE_METADATA = "metadata";
+    public static final String NAME = "name";
+    public static final String DATA_TYPE = "dataType";
+    public static final String COMMENT = "comment";
+    public static final String EXPRESSION = "expression";
+    public static final String METADATA_KEY = "metadataKey";
+    public static final String IS_VIRTUAL = "isVirtual";
+
+    public ColumnJsonSerializer() {
+        super(Column.class);
+    }
+
+    @Override
+    public void serialize(
+            Column column, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
+            throws IOException {
+        jsonGenerator.writeStartObject();
+
+        // Common fields
+        jsonGenerator.writeStringField(NAME, column.getName());
+        serializeOptionalField(jsonGenerator, COMMENT, column.getComment(), serializerProvider);
+
+        if (column instanceof Column.PhysicalColumn) {
+            serialize((Column.PhysicalColumn) column, jsonGenerator, serializerProvider);
+        } else if (column instanceof Column.MetadataColumn) {
+            serialize((Column.MetadataColumn) column, jsonGenerator, serializerProvider);
+        } else {

Review comment:
       use `else if` here

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonDeserializer.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.ObjectCodec;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COLUMN_TYPE;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COLUMN_TYPE_COMPUTED;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COLUMN_TYPE_METADATA;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COLUMN_TYPE_PHYSICAL;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COMMENT;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.DATA_TYPE;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.EXPRESSION;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.IS_VIRTUAL;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.METADATA_KEY;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.NAME;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.deserializeOptionalField;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.traverse;
+
+class ColumnJsonDeserializer extends StdDeserializer<Column> {
+
+    private static final String[] SUPPORTED_COLUMN_TYPES =

Review comment:
       nit: I would recommend `KIND` instead of `TYPE`, it makes discussions easier.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java
##########
@@ -36,32 +37,56 @@
 import javax.annotation.Nullable;
 
 import java.util.List;
+import java.util.Objects;
 
 /**
  * {@link DynamicTableSourceSpec} describes how to serialize/deserialize dynamic table sink table
  * and create {@link DynamicTableSink} from the deserialization result.
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 @JsonInclude(JsonInclude.Include.NON_EMPTY)
-public class DynamicTableSinkSpec extends CatalogTableSpecBase {
+public class DynamicTableSinkSpec {
 
+    public static final String FIELD_NAME_CATALOG_TABLE_SPEC = "catalogTable";

Review comment:
       `FIELD_NAME_CATALOG_TABLE` because it is not a spec

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonSerializer.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ExternalCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * This serializer can be configured via an attribute to serialize or not the options and comments,
+ * setting the attribute {@link #SERIALIZE_OPTIONS} to {@code true} or {@code false}.
+ */
+class ResolvedCatalogTableJsonSerializer extends StdSerializer<ResolvedCatalogTable> {
+    private static final long serialVersionUID = 1L;
+
+    static final String SERIALIZE_OPTIONS = "serialize_options";
+
+    public static final String RESOLVED_SCHEMA = "resolvedSchema";
+    public static final String PARTITION_KEYS = "partitionKeys";
+    public static final String OPTIONS = "options";
+    public static final String COMMENT = "comment";
+
+    public ResolvedCatalogTableJsonSerializer() {
+        super(ResolvedCatalogTable.class);
+    }
+
+    @Override
+    public void serialize(
+            ResolvedCatalogTable resolvedCatalogTable,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        // Thia should never happen anyway, but we keep this assertion for sanity check
+        assert resolvedCatalogTable.getTableKind() == CatalogBaseTable.TableKind.TABLE;
+
+        boolean serializeOptions =

Review comment:
       I find it confusing to have a second way of reading configuration. Why not using the `SerdeContext` instead? When people search for usages of the table option, they will not find the usages of a second config stack.

##########
File path: flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out
##########
@@ -1,76 +1,110 @@
 {
-   "flinkVersion":"",
-   "nodes":[
-      {
-         "class":"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
-         "scanTableSource":{
-            "identifier":{
-               "catalogName":"default_catalog",
-               "databaseName":"default_database",
-               "tableName":"MyTable"
+  "flinkVersion": "",
+  "nodes": [
+    {
+      "class": "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
+      "scanTableSource": {
+        "catalogTable": {
+          "identifier": "`default_catalog`.`default_database`.`MyTable`",
+          "catalogTable": {
+            "resolvedSchema": {
+              "columns": [
+                {
+                  "name": "a",
+                  "type": "physical",
+                  "dataType": "BIGINT"
+                },
+                {
+                  "name": "b",
+                  "type": "physical",

Review comment:
       let's still try to comply with `CatalogPropertiesUtil#serializeCatalogTable`. E.g. we can omit `"type": "physical"`

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ContextResolvedTableJsonSerializer.FIELD_NAME_CATALOG_TABLE;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ContextResolvedTableJsonSerializer.FIELD_NAME_IDENTIFIER;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedCatalogTableJsonSerializer.OPTIONS;
+
+class ContextResolvedTableJsonDeserializer extends StdDeserializer<ContextResolvedTable> {
+    private static final long serialVersionUID = 1L;
+
+    public ContextResolvedTableJsonDeserializer() {
+        super(ContextResolvedTable.class);
+    }
+
+    @Override
+    public ContextResolvedTable deserialize(JsonParser jsonParser, DeserializationContext ctx)
+            throws IOException {
+        final CatalogPlanRestore planRestoreOption =
+                SerdeContext.get(ctx)
+                        .getConfiguration()
+                        .get(TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS);
+        final CatalogManager catalogManager =
+                SerdeContext.get(ctx).getFlinkContext().getCatalogManager();
+        final ObjectNode objectNode = jsonParser.readValueAsTree();
+
+        // Deserialize the two fields, if available
+        final ObjectIdentifier identifier =
+                JsonSerdeUtil.deserializeOptionalField(
+                                objectNode,
+                                FIELD_NAME_IDENTIFIER,
+                                ObjectIdentifier.class,
+                                jsonParser.getCodec(),
+                                ctx)
+                        .orElse(null);
+        ResolvedCatalogTable resolvedCatalogTable =
+                JsonSerdeUtil.deserializeOptionalField(
+                                objectNode,
+                                FIELD_NAME_CATALOG_TABLE,
+                                ResolvedCatalogTable.class,
+                                jsonParser.getCodec(),
+                                ctx)
+                        .orElse(null);
+
+        if (identifier == null && resolvedCatalogTable == null) {
+            throw new ValidationException(
+                    String.format(
+                            "The input json is invalid because it doesn't contain '%s', nor the '%s'.",
+                            FIELD_NAME_IDENTIFIER, FIELD_NAME_CATALOG_TABLE));
+        }
+
+        if (identifier == null) {
+            if (isLookupForced(planRestoreOption)) {
+                throw missingIdentifier();
+            }
+            return ContextResolvedTable.anonymous(resolvedCatalogTable);
+        }
+
+        Optional<ContextResolvedTable> contextResolvedTableFromCatalog =
+                isLookupEnabled(planRestoreOption)
+                        ? catalogManager.getTable(identifier)
+                        : Optional.empty();
+
+        // If we have a schema from the plan and from the catalog, we need to check they match.
+        if (contextResolvedTableFromCatalog.isPresent() && resolvedCatalogTable != null) {
+            ResolvedSchema schemaFromPlan = resolvedCatalogTable.getResolvedSchema();
+            ResolvedSchema schemaFromCatalog =
+                    contextResolvedTableFromCatalog.get().getResolvedSchema();
+            if (!areResolvedSchemasEqual(schemaFromPlan, schemaFromCatalog)) {
+                throw schemaNotMatching(identifier, schemaFromPlan, schemaFromCatalog);
+            }
+        }
+
+        if (resolvedCatalogTable == null || isLookupForced(planRestoreOption)) {
+            if (!isLookupEnabled(planRestoreOption)) {
+                throw lookupDisabled(identifier);
+            }
+            // We use what is stored inside the catalog
+            return contextResolvedTableFromCatalog.orElseThrow(
+                    () -> missingTableFromCatalog(identifier));
+        }
+
+        if (contextResolvedTableFromCatalog.isPresent()) {
+            // If no config map is present, then the ContextResolvedTable was serialized with
+            // SCHEMA, so we just need to return the catalog query result
+            if (objectNode.at("/" + FIELD_NAME_CATALOG_TABLE + "/" + OPTIONS).isMissingNode()) {
+                return contextResolvedTableFromCatalog.get();
+            }
+
+            return contextResolvedTableFromCatalog
+                    .flatMap(ContextResolvedTable::getCatalog)
+                    .map(c -> ContextResolvedTable.permanent(identifier, c, resolvedCatalogTable))
+                    .orElseGet(
+                            () -> ContextResolvedTable.temporary(identifier, resolvedCatalogTable));
+        }
+
+        return ContextResolvedTable.temporary(identifier, resolvedCatalogTable);
+    }
+
+    private boolean areResolvedSchemasEqual(
+            ResolvedSchema schemaFromPlan, ResolvedSchema schemaFromCatalog) {
+        // For schema equality we check:
+        //  * Columns size and order
+        //  * For each column: name, kind (class) and type
+        //  * Check partition keys set equality
+        @SuppressWarnings("rawtypes")
+        List<Tuple3<String, Class, DataType>> columnsFromPlan =
+                schemaFromPlan.getColumns().stream()
+                        .map(c -> Tuple3.of(c.getName(), (Class) c.getClass(), c.getDataType()))
+                        .collect(Collectors.toList());
+
+        @SuppressWarnings("rawtypes")
+        List<Tuple3<String, Class, DataType>> columnsFromCatalog =
+                schemaFromCatalog.getColumns().stream()
+                        .map(c -> Tuple3.of(c.getName(), (Class) c.getClass(), c.getDataType()))
+                        .collect(Collectors.toList());
+
+        return Objects.equals(columnsFromPlan, columnsFromCatalog)
+                && Objects.equals(
+                        schemaFromPlan.getPrimaryKey(), schemaFromCatalog.getPrimaryKey());
+    }
+
+    private boolean isLookupForced(CatalogPlanRestore planRestoreOption) {
+        return planRestoreOption == CatalogPlanRestore.IDENTIFIER;
+    }
+
+    private boolean isLookupEnabled(CatalogPlanRestore planRestoreOption) {
+        return planRestoreOption != CatalogPlanRestore.ALL_ENFORCED;
+    }
+
+    static ValidationException missingIdentifier() {
+        return new ValidationException(
+                String.format(
+                        "The ContextResolvedTable cannot be deserialized, as no identifier is present within the json, "
+                                + "but lookup is forced by '%s' == '%s'. "
+                                + "Either allow restoring table from the catalog with '%s' == '%s' | '%s' or make sure you don't use anonymous tables when generating the plan.",
+                        TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS.key(),
+                        CatalogPlanRestore.IDENTIFIER.name(),
+                        TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS.key(),
+                        CatalogPlanRestore.ALL.name(),
+                        CatalogPlanRestore.ALL_ENFORCED.name()));
+    }
+
+    static ValidationException lookupDisabled(ObjectIdentifier objectIdentifier) {
+        return new ValidationException(
+                String.format(
+                        "The ContextResolvedTable with identifier %s does not contain any %s field, "
+                                + "but lookup is disabled because option '%s' == '%s'. "
+                                + "Either enable the catalog lookup with '%s' == '%s' | '%s' or regenerate the plan with '%s' != '%s'.",
+                        objectIdentifier,
+                        FIELD_NAME_CATALOG_TABLE,
+                        TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS.key(),
+                        CatalogPlanRestore.ALL_ENFORCED.name(),
+                        TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS.key(),
+                        CatalogPlanRestore.IDENTIFIER.name(),
+                        CatalogPlanRestore.ALL.name(),
+                        TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS.key(),
+                        TableConfigOptions.CatalogPlanCompilation.IDENTIFIER.name()));
+    }
+
+    static ValidationException schemaNotMatching(
+            ObjectIdentifier objectIdentifier,
+            ResolvedSchema schemaFromPlan,
+            ResolvedSchema schemaFromCatalog) {
+        return new ValidationException(
+                String.format(
+                        "The schema of the table '%s' from the persisted plan does not match the schema loaded from the catalog: '%s' != '%s'. "
+                                + "Have you modified the table schema in the catalog before restoring the plan?.",
+                        objectIdentifier, schemaFromPlan, schemaFromCatalog));
+    }
+
+    static ValidationException missingTableFromCatalog(ObjectIdentifier objectIdentifier) {
+        return new ValidationException(
+                String.format(
+                        "CatalogManager cannot resolve the table with identifier %s and ContextResolvedTable does not contain any %s field. "

Review comment:
       Don't expose internal classes such as `CatalogManager` or `ContextResolvedTable` in exceptions. Also above.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonSerializer.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/** JSON serializer for {@link ContextResolvedTable}. */
+class ContextResolvedTableJsonSerializer extends StdSerializer<ContextResolvedTable> {
+    private static final long serialVersionUID = 1L;
+
+    public static final String FIELD_NAME_IDENTIFIER = "identifier";
+    public static final String FIELD_NAME_CATALOG_TABLE = "catalogTable";
+
+    public ContextResolvedTableJsonSerializer() {
+        super(ContextResolvedTable.class);
+    }
+
+    @Override
+    public void serialize(
+            ContextResolvedTable contextResolvedTable,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        final CatalogPlanCompilation planCompilationOption =
+                SerdeContext.get(serializerProvider)
+                        .getConfiguration()
+                        .get(TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS);
+
+        if (contextResolvedTable.isAnonymous()
+                && planCompilationOption == CatalogPlanCompilation.IDENTIFIER) {
+            throw cannotSerializeAnonymousTable(contextResolvedTable.getIdentifier());
+        }
+
+        jsonGenerator.writeStartObject();
+
+        if (!contextResolvedTable.isAnonymous()) {
+            // Serialize object identifier
+            jsonGenerator.writeObjectField(
+                    FIELD_NAME_IDENTIFIER, contextResolvedTable.getIdentifier());
+        }
+
+        if ((contextResolvedTable.isPermanent() || contextResolvedTable.isAnonymous())
+                && planCompilationOption != CatalogPlanCompilation.IDENTIFIER) {
+            // Pass to the ResolvedCatalogTableJsonSerializer the option to serialize or not the
+            // identifier
+            serializerProvider.setAttribute(

Review comment:
       I really find it weird that a serializer sets attributes.

##########
File path: flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out
##########
@@ -3,28 +3,117 @@
   "nodes" : [ {
     "class" : "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
     "scanTableSource" : {
-      "identifier" : {
-        "catalogName" : "default_catalog",
-        "databaseName" : "default_database",
-        "tableName" : "MyTable"
-      },
       "catalogTable" : {
-        "schema.watermark.0.strategy.expr" : "`rowtime` - INTERVAL '1' SECOND",
-        "schema.4.expr" : "PROCTIME()",
-        "schema.0.data-type" : "INT",
-        "schema.2.name" : "c",
-        "schema.1.name" : "b",
-        "schema.4.name" : "proctime",
-        "schema.1.data-type" : "BIGINT",
-        "schema.3.data-type" : "TIMESTAMP(3)",
-        "schema.2.data-type" : "VARCHAR(2147483647)",
-        "schema.3.name" : "rowtime",
-        "connector" : "values",
-        "schema.watermark.0.rowtime" : "rowtime",
-        "schema.watermark.0.strategy.data-type" : "TIMESTAMP(3)",
-        "schema.3.expr" : "TO_TIMESTAMP(`c`)",
-        "schema.4.data-type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL",
-        "schema.0.name" : "a"
+        "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+        "catalogTable" : {
+          "resolvedSchema" : {
+            "columns" : [ {
+              "name" : "a",
+              "type" : "physical",
+              "dataType" : "INT"
+            }, {
+              "name" : "b",
+              "type" : "physical",
+              "dataType" : "BIGINT"
+            }, {
+              "name" : "c",
+              "type" : "physical",
+              "dataType" : {
+                "logicalType" : "VARCHAR(2147483647)",
+                "conversionClass" : "java.lang.String"
+              }
+            }, {
+              "name" : "rowtime",
+              "type" : "computed",
+              "expression" : {
+                "type" : "rexNodeExpression",

Review comment:
       let's remove `"type" : "rexNodeExpression"` until we have something else than `RexNodeExpression`

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonSerializer.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.expressions.RexNodeExpression;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+class ResolvedExpressionJsonSerializer extends StdSerializer<ResolvedExpression> {
+
+    public static final String TYPE = "type";
+    public static final String TYPE_REX_NODE_EXPRESSION = "rexNodeExpression";
+    public static final String REX_NODE = "rexNode";
+    public static final String OUTPUT_DATA_TYPE = "outputDataType";
+    public static final String SERIALIZABLE_STRING = "serializableString";
+
+    protected ResolvedExpressionJsonSerializer() {
+        super(ResolvedExpression.class);
+    }
+
+    @Override
+    public void serialize(
+            ResolvedExpression resolvedExpression,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        jsonGenerator.writeStartObject();
+
+        if (resolvedExpression instanceof RexNodeExpression) {
+            serialize((RexNodeExpression) resolvedExpression, jsonGenerator, serializerProvider);
+        } else {
+            throw new ValidationException(
+                    String.format(
+                            "Expression '%s' cannot be serialized. "
+                                    + "Currently, only SQL expressions can be serialized in the persisted plan.",
+                            resolvedExpression.asSummaryString()));
+        }
+
+        jsonGenerator.writeEndObject();
+    }
+
+    private void serialize(
+            RexNodeExpression expression,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        jsonGenerator.writeStringField(TYPE, TYPE_REX_NODE_EXPRESSION);
+        serializerProvider.defaultSerializeField(REX_NODE, expression.getRexNode(), jsonGenerator);
+        serializerProvider.defaultSerializeField(

Review comment:
       Can't we even derive the type from the `RexNode`? I have the feeling we are duplicating a lot. Actually `serializableString` and `rexNode` are also kind of duplictated. Given that they are actually not required anymore as they are persisted in the operator following the plan.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java
##########
@@ -36,32 +37,56 @@
 import javax.annotation.Nullable;
 
 import java.util.List;
+import java.util.Objects;
 
 /**
  * {@link DynamicTableSourceSpec} describes how to serialize/deserialize dynamic table sink table
  * and create {@link DynamicTableSink} from the deserialization result.
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 @JsonInclude(JsonInclude.Include.NON_EMPTY)
-public class DynamicTableSinkSpec extends CatalogTableSpecBase {
+public class DynamicTableSinkSpec {
 
+    public static final String FIELD_NAME_CATALOG_TABLE_SPEC = "catalogTable";
     public static final String FIELD_NAME_SINK_ABILITY_SPECS = "sinkAbilitySpecs";
 
-    @JsonIgnore private DynamicTableSink tableSink;
-
-    @JsonProperty(FIELD_NAME_SINK_ABILITY_SPECS)
+    private final ContextResolvedTable contextResolvedTable;
     private final @Nullable List<SinkAbilitySpec> sinkAbilitySpecs;
 
+    @JsonIgnore private DynamicTableSink tableSink;
+    @JsonIgnore private ClassLoader classLoader;

Review comment:
       why do we need class loader and configuration here? can't we access them from other contexts. I seems wrong to me that every instance has references to those.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonSerializer.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.catalog.Column;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.serializeOptionalField;
+
+class ColumnJsonSerializer extends StdSerializer<Column> {
+
+    public static final String COLUMN_TYPE = "type";
+    public static final String COLUMN_TYPE_PHYSICAL = "physical";
+    public static final String COLUMN_TYPE_COMPUTED = "computed";
+    public static final String COLUMN_TYPE_METADATA = "metadata";
+    public static final String NAME = "name";
+    public static final String DATA_TYPE = "dataType";
+    public static final String COMMENT = "comment";
+    public static final String EXPRESSION = "expression";
+    public static final String METADATA_KEY = "metadataKey";
+    public static final String IS_VIRTUAL = "isVirtual";
+
+    public ColumnJsonSerializer() {
+        super(Column.class);
+    }
+
+    @Override
+    public void serialize(
+            Column column, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
+            throws IOException {
+        jsonGenerator.writeStartObject();
+
+        // Common fields
+        jsonGenerator.writeStringField(NAME, column.getName());
+        serializeOptionalField(jsonGenerator, COMMENT, column.getComment(), serializerProvider);
+
+        if (column instanceof Column.PhysicalColumn) {
+            serialize((Column.PhysicalColumn) column, jsonGenerator, serializerProvider);
+        } else if (column instanceof Column.MetadataColumn) {
+            serialize((Column.MetadataColumn) column, jsonGenerator, serializerProvider);
+        } else {
+            serialize((Column.ComputedColumn) column, jsonGenerator, serializerProvider);
+        }
+
+        jsonGenerator.writeEndObject();
+    }
+
+    private void serialize(

Review comment:
       `static`

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ColumnJsonDeserializer.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.ObjectCodec;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COLUMN_TYPE;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COLUMN_TYPE_COMPUTED;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COLUMN_TYPE_METADATA;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COLUMN_TYPE_PHYSICAL;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.COMMENT;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.DATA_TYPE;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.EXPRESSION;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.IS_VIRTUAL;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.METADATA_KEY;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ColumnJsonSerializer.NAME;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.deserializeOptionalField;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil.traverse;
+
+class ColumnJsonDeserializer extends StdDeserializer<Column> {
+
+    private static final String[] SUPPORTED_COLUMN_TYPES =
+            new String[] {COLUMN_TYPE_PHYSICAL, COLUMN_TYPE_COMPUTED, COLUMN_TYPE_METADATA};
+
+    public ColumnJsonDeserializer() {
+        super(Column.class);
+    }
+
+    @Override
+    public Column deserialize(JsonParser jsonParser, DeserializationContext ctx)
+            throws IOException {
+        ObjectNode jsonNode = jsonParser.readValueAsTree();
+        String columnName = jsonNode.required(NAME).asText();
+        String columnType = jsonNode.required(COLUMN_TYPE).asText();
+
+        Column column;
+        switch (columnType) {
+            case COLUMN_TYPE_PHYSICAL:
+                column =
+                        deserializePhysicalColumn(columnName, jsonNode, jsonParser.getCodec(), ctx);
+                break;
+            case COLUMN_TYPE_COMPUTED:
+                column =
+                        deserializeComputedColumn(columnName, jsonNode, jsonParser.getCodec(), ctx);
+                break;
+            case COLUMN_TYPE_METADATA:
+                column =
+                        deserializeMetadataColumn(columnName, jsonNode, jsonParser.getCodec(), ctx);
+                break;
+            default:
+                throw new ValidationException(
+                        String.format(
+                                "Cannot recognize column type '%s'. Allowed types: %s.",
+                                columnType, Arrays.toString(SUPPORTED_COLUMN_TYPES)));
+        }
+        return column.withComment(
+                deserializeOptionalField(
+                                jsonNode, COMMENT, String.class, jsonParser.getCodec(), ctx)
+                        .orElse(null));
+    }
+
+    public Column.PhysicalColumn deserializePhysicalColumn(

Review comment:
       `private static`

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java
##########
@@ -136,13 +137,74 @@ public LookupTableSource getLookupTableSource(FlinkContext flinkContext) {
         }
     }
 
-    public void setTableSource(DynamicTableSource tableSource) {
-        this.tableSource = tableSource;
+    @JsonGetter(FIELD_NAME_CATALOG_TABLE_SPEC)
+    public ContextResolvedTable getContextResolvedTable() {
+        return contextResolvedTable;
     }
 
-    @JsonIgnore
+    @JsonGetter(FIELD_NAME_SOURCE_ABILITY_SPECS)
     @Nullable
     public List<SourceAbilitySpec> getSourceAbilitySpecs() {
         return sourceAbilitySpecs;
     }
+
+    @JsonIgnore
+    public ClassLoader getClassLoader() {
+        return classLoader;
+    }
+
+    @JsonIgnore
+    public ReadableConfig getReadableConfig() {
+        return configuration;
+    }
+
+    public void setTableSource(DynamicTableSource tableSource) {
+        this.tableSource = tableSource;
+    }
+
+    public void setClassLoader(ClassLoader classLoader) {

Review comment:
       we should definitely try to make specs immutable.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ObjectIdentifierJsonDeserializer.java
##########
@@ -42,15 +38,28 @@ public ObjectIdentifierJsonDeserializer() {
 
     @Override
     public ObjectIdentifier deserialize(JsonParser jsonParser, DeserializationContext ctx)
-            throws IOException, JsonProcessingException {
-        final JsonNode identifierNode = jsonParser.readValueAsTree();
-        return deserialize(identifierNode);
+            throws IOException {
+        return deserialize(jsonParser.getValueAsString(), SerdeContext.get(ctx));
     }
 
-    public static ObjectIdentifier deserialize(JsonNode identifierNode) {
+    static ObjectIdentifier deserialize(String identifierStr, SerdeContext ctx) {

Review comment:
       This could have been a JIRA issue and PR on its own. It is better to fork PRs instead of having these 13K PRs that are hard to review.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableJsonSerializer.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ExternalCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * This serializer can be configured via an attribute to serialize or not the options and comments,
+ * setting the attribute {@link #SERIALIZE_OPTIONS} to {@code true} or {@code false}.
+ */
+class ResolvedCatalogTableJsonSerializer extends StdSerializer<ResolvedCatalogTable> {
+    private static final long serialVersionUID = 1L;
+
+    static final String SERIALIZE_OPTIONS = "serialize_options";
+
+    public static final String RESOLVED_SCHEMA = "resolvedSchema";
+    public static final String PARTITION_KEYS = "partitionKeys";
+    public static final String OPTIONS = "options";
+    public static final String COMMENT = "comment";
+
+    public ResolvedCatalogTableJsonSerializer() {
+        super(ResolvedCatalogTable.class);
+    }
+
+    @Override
+    public void serialize(
+            ResolvedCatalogTable resolvedCatalogTable,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        // Thia should never happen anyway, but we keep this assertion for sanity check
+        assert resolvedCatalogTable.getTableKind() == CatalogBaseTable.TableKind.TABLE;
+
+        boolean serializeOptions =
+                serializerProvider.getAttribute(SERIALIZE_OPTIONS) == null
+                        || (boolean) serializerProvider.getAttribute(SERIALIZE_OPTIONS);
+
+        jsonGenerator.writeStartObject();
+
+        if (resolvedCatalogTable.getOrigin() instanceof ExternalCatalogTable) {
+            throw new ValidationException(
+                    "Cannot serialize the table as it's an external inline table. "
+                            + "This might be caused by a usage of "
+                            + "StreamTableEnvironment#fromDataStream or TableResult#collect, "
+                            + "which are not supported by the persisted plan");

Review comment:
       nit: dot at the end of exceptions

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSourceSpec.java
##########
@@ -81,19 +84,19 @@ private DynamicTableSource getTableSource(FlinkContext flinkContext) {
 
             tableSource =
                     FactoryUtil.createDynamicTableSource(
-                            // TODO Support creating from a catalog
                             factory,
-                            objectIdentifier,
-                            catalogTable,
+                            contextResolvedTable.getIdentifier(),
+                            contextResolvedTable.getResolvedTable(),
+                            SpecUtil.loadOptionsFromCatalogTable(

Review comment:
       we should not add too many utils. I think this method would be a good candidate for a upper class `DynamicTableSpec`?

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedExpressionJsonSerializer.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.planner.expressions.RexNodeExpression;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+class ResolvedExpressionJsonSerializer extends StdSerializer<ResolvedExpression> {
+
+    public static final String TYPE = "type";
+    public static final String TYPE_REX_NODE_EXPRESSION = "rexNodeExpression";
+    public static final String REX_NODE = "rexNode";
+    public static final String OUTPUT_DATA_TYPE = "outputDataType";
+    public static final String SERIALIZABLE_STRING = "serializableString";
+
+    protected ResolvedExpressionJsonSerializer() {
+        super(ResolvedExpression.class);
+    }
+
+    @Override
+    public void serialize(
+            ResolvedExpression resolvedExpression,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        jsonGenerator.writeStartObject();
+
+        if (resolvedExpression instanceof RexNodeExpression) {
+            serialize((RexNodeExpression) resolvedExpression, jsonGenerator, serializerProvider);
+        } else {
+            throw new ValidationException(
+                    String.format(
+                            "Expression '%s' cannot be serialized. "
+                                    + "Currently, only SQL expressions can be serialized in the persisted plan.",
+                            resolvedExpression.asSummaryString()));
+        }
+
+        jsonGenerator.writeEndObject();
+    }
+
+    private void serialize(
+            RexNodeExpression expression,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        jsonGenerator.writeStringField(TYPE, TYPE_REX_NODE_EXPRESSION);
+        serializerProvider.defaultSerializeField(REX_NODE, expression.getRexNode(), jsonGenerator);
+        serializerProvider.defaultSerializeField(

Review comment:
       Only serialize the logical type, this should be enough. It might was a mistake to let `Expression` return `DataType`. But we should not let this mistake bubble into the persisted plan.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.catalog.Constraint.ConstraintType;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+abstract class UniqueConstraintMixin {

Review comment:
       link to the class that this mixin references in the JavaDocs

##########
File path: flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out
##########
@@ -1,76 +1,110 @@
 {
-   "flinkVersion":"",
-   "nodes":[
-      {
-         "class":"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
-         "scanTableSource":{
-            "identifier":{
-               "catalogName":"default_catalog",
-               "databaseName":"default_database",
-               "tableName":"MyTable"
+  "flinkVersion": "",
+  "nodes": [
+    {
+      "class": "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
+      "scanTableSource": {
+        "catalogTable": {
+          "identifier": "`default_catalog`.`default_database`.`MyTable`",
+          "catalogTable": {
+            "resolvedSchema": {
+              "columns": [
+                {
+                  "name": "a",
+                  "type": "physical",
+                  "dataType": "BIGINT"
+                },
+                {
+                  "name": "b",
+                  "type": "physical",

Review comment:
       type computed is implicit if there is an expression, metadata with a metadata key

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableJsonDeserializer.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.ContextResolvedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ContextResolvedTableJsonSerializer.FIELD_NAME_CATALOG_TABLE;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ContextResolvedTableJsonSerializer.FIELD_NAME_IDENTIFIER;
+import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedCatalogTableJsonSerializer.OPTIONS;
+
+class ContextResolvedTableJsonDeserializer extends StdDeserializer<ContextResolvedTable> {
+    private static final long serialVersionUID = 1L;
+
+    public ContextResolvedTableJsonDeserializer() {
+        super(ContextResolvedTable.class);
+    }
+
+    @Override
+    public ContextResolvedTable deserialize(JsonParser jsonParser, DeserializationContext ctx)
+            throws IOException {
+        final CatalogPlanRestore planRestoreOption =
+                SerdeContext.get(ctx)
+                        .getConfiguration()
+                        .get(TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS);
+        final CatalogManager catalogManager =
+                SerdeContext.get(ctx).getFlinkContext().getCatalogManager();
+        final ObjectNode objectNode = jsonParser.readValueAsTree();
+
+        // Deserialize the two fields, if available
+        final ObjectIdentifier identifier =
+                JsonSerdeUtil.deserializeOptionalField(
+                                objectNode,
+                                FIELD_NAME_IDENTIFIER,
+                                ObjectIdentifier.class,
+                                jsonParser.getCodec(),
+                                ctx)
+                        .orElse(null);
+        ResolvedCatalogTable resolvedCatalogTable =
+                JsonSerdeUtil.deserializeOptionalField(
+                                objectNode,
+                                FIELD_NAME_CATALOG_TABLE,
+                                ResolvedCatalogTable.class,
+                                jsonParser.getCodec(),
+                                ctx)
+                        .orElse(null);
+
+        if (identifier == null && resolvedCatalogTable == null) {
+            throw new ValidationException(
+                    String.format(
+                            "The input json is invalid because it doesn't contain '%s', nor the '%s'.",
+                            FIELD_NAME_IDENTIFIER, FIELD_NAME_CATALOG_TABLE));
+        }
+
+        if (identifier == null) {
+            if (isLookupForced(planRestoreOption)) {
+                throw missingIdentifier();
+            }
+            return ContextResolvedTable.anonymous(resolvedCatalogTable);
+        }
+
+        Optional<ContextResolvedTable> contextResolvedTableFromCatalog =
+                isLookupEnabled(planRestoreOption)
+                        ? catalogManager.getTable(identifier)
+                        : Optional.empty();
+
+        // If we have a schema from the plan and from the catalog, we need to check they match.
+        if (contextResolvedTableFromCatalog.isPresent() && resolvedCatalogTable != null) {
+            ResolvedSchema schemaFromPlan = resolvedCatalogTable.getResolvedSchema();
+            ResolvedSchema schemaFromCatalog =
+                    contextResolvedTableFromCatalog.get().getResolvedSchema();
+            if (!areResolvedSchemasEqual(schemaFromPlan, schemaFromCatalog)) {
+                throw schemaNotMatching(identifier, schemaFromPlan, schemaFromCatalog);
+            }
+        }
+
+        if (resolvedCatalogTable == null || isLookupForced(planRestoreOption)) {
+            if (!isLookupEnabled(planRestoreOption)) {
+                throw lookupDisabled(identifier);
+            }
+            // We use what is stored inside the catalog
+            return contextResolvedTableFromCatalog.orElseThrow(
+                    () -> missingTableFromCatalog(identifier));
+        }
+
+        if (contextResolvedTableFromCatalog.isPresent()) {
+            // If no config map is present, then the ContextResolvedTable was serialized with
+            // SCHEMA, so we just need to return the catalog query result
+            if (objectNode.at("/" + FIELD_NAME_CATALOG_TABLE + "/" + OPTIONS).isMissingNode()) {
+                return contextResolvedTableFromCatalog.get();
+            }
+
+            return contextResolvedTableFromCatalog
+                    .flatMap(ContextResolvedTable::getCatalog)
+                    .map(c -> ContextResolvedTable.permanent(identifier, c, resolvedCatalogTable))
+                    .orElseGet(
+                            () -> ContextResolvedTable.temporary(identifier, resolvedCatalogTable));
+        }
+
+        return ContextResolvedTable.temporary(identifier, resolvedCatalogTable);
+    }
+
+    private boolean areResolvedSchemasEqual(
+            ResolvedSchema schemaFromPlan, ResolvedSchema schemaFromCatalog) {
+        // For schema equality we check:
+        //  * Columns size and order
+        //  * For each column: name, kind (class) and type
+        //  * Check partition keys set equality
+        @SuppressWarnings("rawtypes")
+        List<Tuple3<String, Class, DataType>> columnsFromPlan =

Review comment:
       can we avoid using `Tuple3`, this is a class from the DataStream API and actually there is a reason why Java has no tuples. usually, there is always a better alternative. A `for` loop with "early out" in this case.

##########
File path: flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out
##########
@@ -1,76 +1,110 @@
 {
-   "flinkVersion":"",
-   "nodes":[
-      {
-         "class":"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
-         "scanTableSource":{
-            "identifier":{
-               "catalogName":"default_catalog",
-               "databaseName":"default_database",
-               "tableName":"MyTable"
+  "flinkVersion": "",
+  "nodes": [
+    {
+      "class": "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
+      "scanTableSource": {
+        "catalogTable": {
+          "identifier": "`default_catalog`.`default_database`.`MyTable`",
+          "catalogTable": {
+            "resolvedSchema": {
+              "columns": [
+                {
+                  "name": "a",
+                  "type": "physical",
+                  "dataType": "BIGINT"
+                },
+                {
+                  "name": "b",
+                  "type": "physical",
+                  "dataType": "INT"
+                },
+                {
+                  "name": "c",
+                  "type": "physical",
+                  "dataType": {
+                    "logicalType": "VARCHAR(2147483647)",

Review comment:
       let's only serialize the logical type

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ConverterDelegatingDeserializer.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.BeanDescription;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationConfig;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.DelegatingDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Deserializer which delegates to the default {@link BeanDeserializer} and then executes custom
+ * code to perform a conversion to another final value.
+ *
+ * <p>Use the {@link Converter} when you want to use Jackson annotations for defining serializers
+ * and deserializers, but after the deserialization you need to perform an additional transformation
+ * step that doesn't depend on the original JSON, e.g. enrich the output value with info from {@link
+ * SerdeContext}.
+ */
+class ConverterDelegatingDeserializer<T, R> extends DelegatingDeserializer {

Review comment:
       I'm pretty sure we don't need this class. Let's have an offline chat about it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org