You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/26 03:02:33 UTC

[GitHub] [flink] wenlong88 commented on a change in pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

wenlong88 commented on a change in pull request #14729:
URL: https://github.com/apache/flink/pull/14729#discussion_r564170827



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
##########
@@ -21,39 +21,97 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /** The representation of an edge connecting two {@link ExecNode}. */
 @Internal
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class ExecEdge {
 
     public static final ExecEdge DEFAULT = ExecEdge.builder().build();
 
+    public static final String FIELD_NAME_REQUIRED_SHUFFLE = "requiredShuffle";
+    public static final String FIELD_NAME_DAM_BEHAVIOR = "damBehavior";
+    public static final String FIELD_NAME_PRIORITY = "priority";
+
+    @JsonProperty(FIELD_NAME_REQUIRED_SHUFFLE)
+    @JsonSerialize(using = RequiredShuffleJsonSerializer.class)
+    @JsonDeserialize(using = RequiredShuffleJsonDeserializer.class)

Review comment:
       why do we need the serializer? I think just add json annotation to RequiredShuffle may be enough?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CatalogTableSpecBase.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** {@link CatalogTableSpecBase} describes how to serialize/deserialize a catalog table. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CatalogTableSpecBase {
+    public static final String FIELD_NAME_IDENTIFIER = "identifier";
+    public static final String FIELD_NAME_CATALOG_NAME = "catalogName";
+    public static final String FIELD_NAME_DATABASE_NAME = "databaseName";
+    public static final String FIELD_NAME_TABLE_NAME = "tableName";
+
+    public static final String FIELD_NAME_CATALOG_TABLE = "catalogTable";
+    public static final String FIELD_NAME_CONFIGURATION = "configuration";
+
+    @JsonProperty(value = FIELD_NAME_IDENTIFIER, required = true)
+    @JsonSerialize(using = ObjectIdentifierJsonSerializer.class)
+    @JsonDeserialize(using = ObjectIdentifierJsonDeserializer.class)
+    protected final ObjectIdentifier objectIdentifier;
+
+    @JsonProperty(value = FIELD_NAME_CATALOG_TABLE, required = true)
+    @JsonSerialize(using = CatalogTableJsonSerializer.class)
+    @JsonDeserialize(using = CatalogTableJsonDeserializer.class)
+    protected final CatalogTable catalogTable;
+
+    @JsonProperty(value = FIELD_NAME_CONFIGURATION, required = true)
+    @JsonSerialize(using = ReadableConfigJsonSerializer.class)
+    @JsonDeserialize(using = ReadableConfigJsonDeserializer.class)
+    protected final ReadableConfig configuration;

Review comment:
       the configuration here is the table config, I think it is not good to persist configuration per table.

##########
File path: flink-table/flink-table-planner-blink/pom.xml
##########
@@ -418,6 +419,11 @@ under the License.
 									<shadedPattern>org.apache.flink.table.shaded.org.codehaus</shadedPattern>
 								</relocation>-->
 
+								<relocation>
+									<pattern>org.reflections</pattern>
+									<shadedPattern>org.apache.flink.calcite.shaded.org.reflections</shadedPattern>

Review comment:
       why do we need to add the shading? the shading pattern should start with be org.apache.flink.table?

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeSerdeTest.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.expressions.TimeIntervalUnit;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.TypeInformationRawType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests for {@link LogicalType} serialization and deserialization. */
+@RunWith(Parameterized.class)
+public class LogicalTypeSerdeTest {

Review comment:
       we may also need to test the coverage to make sure that all of the logicalType is supported.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
##########
@@ -36,46 +46,97 @@
  *
  * @param <T> The type of the elements that result from this node.
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "class")
 public abstract class ExecNodeBase<T> implements ExecNode<T> {
 
-    private final String description;
-    private final List<ExecEdge> inputEdges;
-    private final LogicalType outputType;
+    public static final String FIELD_NAME_ID = "id";
+    public static final String FIELD_NAME_CLASS = "class";
+    public static final String FIELD_NAME_DESCRIPTION = "description";
+    public static final String FIELD_NAME_INPUT_EDGES = "inputEdges";
+    public static final String FIELD_NAME_OUTPUT_TYPE = "outputType";
+    public static final String FILED_NAME_INPUTS = "inputs";
+
+    /** The unique identifier for each ExecNode in the json plan. */
+    @JsonIgnore private final int id;
+
+    @JsonIgnore private final String description;
+
+    @JsonIgnore private final List<ExecEdge> inputEdges;
+
+    @JsonIgnore private final LogicalType outputType;
+
     // TODO remove this field once edge support `source` and `target`,
     //  and then we can get/set `inputNodes` through `inputEdges`.
-    private List<ExecNode<?>> inputNodes;
+    @JsonIgnore private List<ExecNode<?>> inputNodes;
 
-    private transient Transformation<T> transformation;
+    @JsonIgnore private transient Transformation<T> transformation;
 
-    protected ExecNodeBase(List<ExecEdge> inputEdges, LogicalType outputType, String description) {
+    /** This is used to assign a unique ID to every ExecNode. */
+    private static Integer idCounter = 0;
+
+    /** Generate an unique ID for ExecNode. */
+    public static int getNewNodeId() {
+        idCounter++;
+        return idCounter;
+    }
+
+    // used for json creator
+    protected ExecNodeBase(
+            int id, List<ExecEdge> inputEdges, LogicalType outputType, String description) {
+        this.id = id;
         this.inputEdges = new ArrayList<>(checkNotNull(inputEdges));
         this.outputType = checkNotNull(outputType);
         this.description = checkNotNull(description);
     }
 
+    protected ExecNodeBase(List<ExecEdge> inputEdges, LogicalType outputType, String description) {
+        this(getNewNodeId(), inputEdges, outputType, description);
+    }
+
+    @JsonProperty(value = FIELD_NAME_ID)
+    @Override
+    public final int getId() {
+        return id;
+    }
+
+    /** The name used to identify each sub-class in the json plan. */
+    @JsonProperty(value = FIELD_NAME_CLASS, access = JsonProperty.Access.READ_ONLY)
+    public final String getClassName() {
+        return getClass().getCanonicalName();
+    }
+
+    @JsonProperty(value = FIELD_NAME_DESCRIPTION)

Review comment:
       why not just add annotation on fields, I think it is not so straight-forward to add on get Method

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CatalogTableSpecBase.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** {@link CatalogTableSpecBase} describes how to serialize/deserialize a catalog table. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CatalogTableSpecBase {

Review comment:
       move spec class to utils or create a new packages for specs?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.TypeInformationRawType;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.table.utils.TypeStringUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * JSON serializer for {@link LogicalType}. refer to {@link LogicalTypeJsonDeserializer} for
+ * deserializer.
+ */
+public class LogicalTypeJsonSerializer extends StdSerializer<LogicalType> {
+    private static final long serialVersionUID = 1L;
+
+    public static final String FIELD_NAME_NULLABLE = "nullable";
+    public static final String FIELD_NAME_SYMBOL_CLASS = "symbolClass";
+    public static final String FIELD_NAME_TYPE_INFO = "typeInfo";
+
+    public LogicalTypeJsonSerializer() {
+        super(LogicalType.class);
+    }
+
+    @Override
+    public void serialize(
+            LogicalType logicalType,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        if (logicalType instanceof SymbolType) {
+            // SymbolType does not support `asSerializableString`
+            SymbolType<?> symbolType = (SymbolType<?>) logicalType;
+            jsonGenerator.writeStartObject();
+            jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, symbolType.isNullable());
+            jsonGenerator.writeStringField(
+                    FIELD_NAME_SYMBOL_CLASS, symbolType.getDefaultConversion().getCanonicalName());
+            jsonGenerator.writeEndObject();
+        } else if (logicalType instanceof TypeInformationRawType) {
+            // TypeInformationRawType does not support `asSerializableString`
+            TypeInformationRawType<?> rawType = (TypeInformationRawType<?>) logicalType;
+            jsonGenerator.writeStartObject();
+            jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, rawType.isNullable());
+            jsonGenerator.writeStringField(
+                    FIELD_NAME_TYPE_INFO,
+                    EncodingUtils.escapeSingleQuotes(
+                            TypeStringUtils.writeTypeInfo(rawType.getTypeInformation())));

Review comment:
       how about use base64 encoding? I thinks it would be safer than just escaping single quote

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpec.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+/**
+ * {@link DynamicTableSourceSpec} describes how to serialize/deserialize dynamic table source table
+ * and create {@link DynamicTableSource} from the deserialization result.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DynamicTableSourceSpec extends CatalogTableSpecBase {
+
+    @JsonIgnore private DynamicTableSource tableSource;

Review comment:
       I am afraid that it would not work well when there is something pushed down to the table source, we can not rebuild it from catalog table




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org