You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/22 13:24:25 UTC

[GitHub] [flink] godfreyhe opened a new pull request #14729: [FLINK-21092][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

godfreyhe opened a new pull request #14729:
URL: https://github.com/apache/flink/pull/14729


   
   ## What is the purpose of the change
   
   *This pr aims to support getting/executing exec node plan for SQL: `INSERT INTO MySink SELECT * FROM MyTable` *
   
   
   ## Brief change log
     - *Introduce getJsonPlan, explainJsonPlan and executeJsonPlan in TableEnvironmentInternal*
     - *Support StreamExecTableSource json serialization/deserialization*
     - *Support StreamExecSink json serialization/deserialization*
     - *Introduce ExecNodeGraphJsonPlanGenerator to serialize ExecNodeGraph to json plan and deserialize json plan to ExecNodeGraph*
   
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   
     - *Added unit tests to verify the serialization/deserialization for different classes, such as LogicalTypeSerdeTest, ExecEdgeSerdeTest*
     - *Extended integration test to verify getJsonPlan, explainJsonPlan and executeJsonPlan on `INSERT INTO MySink SELECT * FROM MyTable` statement *
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (**yes** / no)
     - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381",
       "triggerID" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "765887150",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12432",
       "triggerID" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12492",
       "triggerID" : "b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8e7fedb102a0d654ecd59f862e9b912471c2386f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8e7fedb102a0d654ecd59f862e9b912471c2386f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12492) 
   * 8e7fedb102a0d654ecd59f862e9b912471c2386f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #14729:
URL: https://github.com/apache/flink/pull/14729#discussion_r564961534



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CatalogTableSpecBase.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** {@link CatalogTableSpecBase} describes how to serialize/deserialize a catalog table. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CatalogTableSpecBase {
+    public static final String FIELD_NAME_IDENTIFIER = "identifier";
+    public static final String FIELD_NAME_CATALOG_NAME = "catalogName";
+    public static final String FIELD_NAME_DATABASE_NAME = "databaseName";
+    public static final String FIELD_NAME_TABLE_NAME = "tableName";
+
+    public static final String FIELD_NAME_CATALOG_TABLE = "catalogTable";
+    public static final String FIELD_NAME_CONFIGURATION = "configuration";
+
+    @JsonProperty(value = FIELD_NAME_IDENTIFIER, required = true)
+    @JsonSerialize(using = ObjectIdentifierJsonSerializer.class)
+    @JsonDeserialize(using = ObjectIdentifierJsonDeserializer.class)
+    protected final ObjectIdentifier objectIdentifier;
+
+    @JsonProperty(value = FIELD_NAME_CATALOG_TABLE, required = true)
+    @JsonSerialize(using = CatalogTableJsonSerializer.class)
+    @JsonDeserialize(using = CatalogTableJsonDeserializer.class)
+    protected final CatalogTable catalogTable;
+
+    @JsonProperty(value = FIELD_NAME_CONFIGURATION, required = true)
+    @JsonSerialize(using = ReadableConfigJsonSerializer.class)
+    @JsonDeserialize(using = ReadableConfigJsonDeserializer.class)
+    protected final ReadableConfig configuration;

Review comment:
       yes, we can persist configuration once on the top level of the json plan




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381",
       "triggerID" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "765887150",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12432",
       "triggerID" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5d701796fc88f998c12bc58ada37b0248ea978c7 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12432) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #14729:
URL: https://github.com/apache/flink/pull/14729#discussion_r564623272



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
##########
@@ -21,39 +21,97 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /** The representation of an edge connecting two {@link ExecNode}. */
 @Internal
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class ExecEdge {
 
     public static final ExecEdge DEFAULT = ExecEdge.builder().build();
 
+    public static final String FIELD_NAME_REQUIRED_SHUFFLE = "requiredShuffle";
+    public static final String FIELD_NAME_DAM_BEHAVIOR = "damBehavior";
+    public static final String FIELD_NAME_PRIORITY = "priority";
+
+    @JsonProperty(FIELD_NAME_REQUIRED_SHUFFLE)
+    @JsonSerialize(using = RequiredShuffleJsonSerializer.class)
+    @JsonDeserialize(using = RequiredShuffleJsonDeserializer.class)

Review comment:
       json annotation is enough for ExecEdge now, but I do some refactoring in #14757, then it need a specific serializer/deserializer. I can revert them and introduce them after #14757 is finished.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wenlong88 commented on a change in pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
wenlong88 commented on a change in pull request #14729:
URL: https://github.com/apache/flink/pull/14729#discussion_r564170827



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
##########
@@ -21,39 +21,97 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /** The representation of an edge connecting two {@link ExecNode}. */
 @Internal
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class ExecEdge {
 
     public static final ExecEdge DEFAULT = ExecEdge.builder().build();
 
+    public static final String FIELD_NAME_REQUIRED_SHUFFLE = "requiredShuffle";
+    public static final String FIELD_NAME_DAM_BEHAVIOR = "damBehavior";
+    public static final String FIELD_NAME_PRIORITY = "priority";
+
+    @JsonProperty(FIELD_NAME_REQUIRED_SHUFFLE)
+    @JsonSerialize(using = RequiredShuffleJsonSerializer.class)
+    @JsonDeserialize(using = RequiredShuffleJsonDeserializer.class)

Review comment:
       why do we need the serializer? I think just add json annotation to RequiredShuffle may be enough?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CatalogTableSpecBase.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** {@link CatalogTableSpecBase} describes how to serialize/deserialize a catalog table. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CatalogTableSpecBase {
+    public static final String FIELD_NAME_IDENTIFIER = "identifier";
+    public static final String FIELD_NAME_CATALOG_NAME = "catalogName";
+    public static final String FIELD_NAME_DATABASE_NAME = "databaseName";
+    public static final String FIELD_NAME_TABLE_NAME = "tableName";
+
+    public static final String FIELD_NAME_CATALOG_TABLE = "catalogTable";
+    public static final String FIELD_NAME_CONFIGURATION = "configuration";
+
+    @JsonProperty(value = FIELD_NAME_IDENTIFIER, required = true)
+    @JsonSerialize(using = ObjectIdentifierJsonSerializer.class)
+    @JsonDeserialize(using = ObjectIdentifierJsonDeserializer.class)
+    protected final ObjectIdentifier objectIdentifier;
+
+    @JsonProperty(value = FIELD_NAME_CATALOG_TABLE, required = true)
+    @JsonSerialize(using = CatalogTableJsonSerializer.class)
+    @JsonDeserialize(using = CatalogTableJsonDeserializer.class)
+    protected final CatalogTable catalogTable;
+
+    @JsonProperty(value = FIELD_NAME_CONFIGURATION, required = true)
+    @JsonSerialize(using = ReadableConfigJsonSerializer.class)
+    @JsonDeserialize(using = ReadableConfigJsonDeserializer.class)
+    protected final ReadableConfig configuration;

Review comment:
       the configuration here is the table config, I think it is not good to persist configuration per table.

##########
File path: flink-table/flink-table-planner-blink/pom.xml
##########
@@ -418,6 +419,11 @@ under the License.
 									<shadedPattern>org.apache.flink.table.shaded.org.codehaus</shadedPattern>
 								</relocation>-->
 
+								<relocation>
+									<pattern>org.reflections</pattern>
+									<shadedPattern>org.apache.flink.calcite.shaded.org.reflections</shadedPattern>

Review comment:
       why do we need to add the shading? the shading pattern should start with be org.apache.flink.table?

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeSerdeTest.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.expressions.TimeIntervalUnit;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.TypeInformationRawType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests for {@link LogicalType} serialization and deserialization. */
+@RunWith(Parameterized.class)
+public class LogicalTypeSerdeTest {

Review comment:
       we may also need to test the coverage to make sure that all of the logicalType is supported.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
##########
@@ -36,46 +46,97 @@
  *
  * @param <T> The type of the elements that result from this node.
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "class")
 public abstract class ExecNodeBase<T> implements ExecNode<T> {
 
-    private final String description;
-    private final List<ExecEdge> inputEdges;
-    private final LogicalType outputType;
+    public static final String FIELD_NAME_ID = "id";
+    public static final String FIELD_NAME_CLASS = "class";
+    public static final String FIELD_NAME_DESCRIPTION = "description";
+    public static final String FIELD_NAME_INPUT_EDGES = "inputEdges";
+    public static final String FIELD_NAME_OUTPUT_TYPE = "outputType";
+    public static final String FILED_NAME_INPUTS = "inputs";
+
+    /** The unique identifier for each ExecNode in the json plan. */
+    @JsonIgnore private final int id;
+
+    @JsonIgnore private final String description;
+
+    @JsonIgnore private final List<ExecEdge> inputEdges;
+
+    @JsonIgnore private final LogicalType outputType;
+
     // TODO remove this field once edge support `source` and `target`,
     //  and then we can get/set `inputNodes` through `inputEdges`.
-    private List<ExecNode<?>> inputNodes;
+    @JsonIgnore private List<ExecNode<?>> inputNodes;
 
-    private transient Transformation<T> transformation;
+    @JsonIgnore private transient Transformation<T> transformation;
 
-    protected ExecNodeBase(List<ExecEdge> inputEdges, LogicalType outputType, String description) {
+    /** This is used to assign a unique ID to every ExecNode. */
+    private static Integer idCounter = 0;
+
+    /** Generate an unique ID for ExecNode. */
+    public static int getNewNodeId() {
+        idCounter++;
+        return idCounter;
+    }
+
+    // used for json creator
+    protected ExecNodeBase(
+            int id, List<ExecEdge> inputEdges, LogicalType outputType, String description) {
+        this.id = id;
         this.inputEdges = new ArrayList<>(checkNotNull(inputEdges));
         this.outputType = checkNotNull(outputType);
         this.description = checkNotNull(description);
     }
 
+    protected ExecNodeBase(List<ExecEdge> inputEdges, LogicalType outputType, String description) {
+        this(getNewNodeId(), inputEdges, outputType, description);
+    }
+
+    @JsonProperty(value = FIELD_NAME_ID)
+    @Override
+    public final int getId() {
+        return id;
+    }
+
+    /** The name used to identify each sub-class in the json plan. */
+    @JsonProperty(value = FIELD_NAME_CLASS, access = JsonProperty.Access.READ_ONLY)
+    public final String getClassName() {
+        return getClass().getCanonicalName();
+    }
+
+    @JsonProperty(value = FIELD_NAME_DESCRIPTION)

Review comment:
       why not just add annotation on fields, I think it is not so straight-forward to add on get Method

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CatalogTableSpecBase.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** {@link CatalogTableSpecBase} describes how to serialize/deserialize a catalog table. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CatalogTableSpecBase {

Review comment:
       move spec class to utils or create a new packages for specs?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.TypeInformationRawType;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.table.utils.TypeStringUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * JSON serializer for {@link LogicalType}. refer to {@link LogicalTypeJsonDeserializer} for
+ * deserializer.
+ */
+public class LogicalTypeJsonSerializer extends StdSerializer<LogicalType> {
+    private static final long serialVersionUID = 1L;
+
+    public static final String FIELD_NAME_NULLABLE = "nullable";
+    public static final String FIELD_NAME_SYMBOL_CLASS = "symbolClass";
+    public static final String FIELD_NAME_TYPE_INFO = "typeInfo";
+
+    public LogicalTypeJsonSerializer() {
+        super(LogicalType.class);
+    }
+
+    @Override
+    public void serialize(
+            LogicalType logicalType,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        if (logicalType instanceof SymbolType) {
+            // SymbolType does not support `asSerializableString`
+            SymbolType<?> symbolType = (SymbolType<?>) logicalType;
+            jsonGenerator.writeStartObject();
+            jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, symbolType.isNullable());
+            jsonGenerator.writeStringField(
+                    FIELD_NAME_SYMBOL_CLASS, symbolType.getDefaultConversion().getCanonicalName());
+            jsonGenerator.writeEndObject();
+        } else if (logicalType instanceof TypeInformationRawType) {
+            // TypeInformationRawType does not support `asSerializableString`
+            TypeInformationRawType<?> rawType = (TypeInformationRawType<?>) logicalType;
+            jsonGenerator.writeStartObject();
+            jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, rawType.isNullable());
+            jsonGenerator.writeStringField(
+                    FIELD_NAME_TYPE_INFO,
+                    EncodingUtils.escapeSingleQuotes(
+                            TypeStringUtils.writeTypeInfo(rawType.getTypeInformation())));

Review comment:
       how about use base64 encoding? I thinks it would be safer than just escaping single quote

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpec.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+/**
+ * {@link DynamicTableSourceSpec} describes how to serialize/deserialize dynamic table source table
+ * and create {@link DynamicTableSource} from the deserialization result.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DynamicTableSourceSpec extends CatalogTableSpecBase {
+
+    @JsonIgnore private DynamicTableSource tableSource;

Review comment:
       I am afraid that it would not work well when there is something pushed down to the table source, we can not rebuild it from catalog table




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381",
       "triggerID" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "765887150",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12432",
       "triggerID" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12492",
       "triggerID" : "b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8e7fedb102a0d654ecd59f862e9b912471c2386f",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12786",
       "triggerID" : "8e7fedb102a0d654ecd59f862e9b912471c2386f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8a6de343620a78f1bb9af2a9892a837c7081c704",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12825",
       "triggerID" : "8a6de343620a78f1bb9af2a9892a837c7081c704",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8e7fedb102a0d654ecd59f862e9b912471c2386f Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12786) 
   * 8a6de343620a78f1bb9af2a9892a837c7081c704 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12825) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #14729:
URL: https://github.com/apache/flink/pull/14729#discussion_r564628143



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.TypeInformationRawType;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.table.utils.TypeStringUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * JSON serializer for {@link LogicalType}. refer to {@link LogicalTypeJsonDeserializer} for
+ * deserializer.
+ */
+public class LogicalTypeJsonSerializer extends StdSerializer<LogicalType> {
+    private static final long serialVersionUID = 1L;
+
+    public static final String FIELD_NAME_NULLABLE = "nullable";
+    public static final String FIELD_NAME_SYMBOL_CLASS = "symbolClass";
+    public static final String FIELD_NAME_TYPE_INFO = "typeInfo";
+
+    public LogicalTypeJsonSerializer() {
+        super(LogicalType.class);
+    }
+
+    @Override
+    public void serialize(
+            LogicalType logicalType,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        if (logicalType instanceof SymbolType) {
+            // SymbolType does not support `asSerializableString`
+            SymbolType<?> symbolType = (SymbolType<?>) logicalType;
+            jsonGenerator.writeStartObject();
+            jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, symbolType.isNullable());
+            jsonGenerator.writeStringField(
+                    FIELD_NAME_SYMBOL_CLASS, symbolType.getDefaultConversion().getCanonicalName());
+            jsonGenerator.writeEndObject();
+        } else if (logicalType instanceof TypeInformationRawType) {
+            // TypeInformationRawType does not support `asSerializableString`
+            TypeInformationRawType<?> rawType = (TypeInformationRawType<?>) logicalType;
+            jsonGenerator.writeStartObject();
+            jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, rawType.isNullable());
+            jsonGenerator.writeStringField(
+                    FIELD_NAME_TYPE_INFO,
+                    EncodingUtils.escapeSingleQuotes(
+                            TypeStringUtils.writeTypeInfo(rawType.getTypeInformation())));

Review comment:
       refer to `LegacyTypeInformationType`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381",
       "triggerID" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "765887150",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12432",
       "triggerID" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ef724e57e7676c154c8f0d6da93a811e9d411051 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402) 
   * 5d701796fc88f998c12bc58ada37b0248ea978c7 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12432) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wenlong88 commented on a change in pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
wenlong88 commented on a change in pull request #14729:
URL: https://github.com/apache/flink/pull/14729#discussion_r568542445



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StatementSetImpl.java
##########
@@ -100,4 +101,27 @@ public TableResult execute() {
             operations.clear();
         }
     }
+
+    /**
+     * Get the json plan of the all statements and Tables as a batch.
+     *
+     * <p>The json plan is the string json representation of an optimized ExecNode plan for the
+     * statements and Tables. An ExecNode plan can be serialized to json plan, and a json plan can
+     * be deserialized to an ExecNode plan.
+     *
+     * <p>NOTES: Only the Blink planner supports this method.
+     *
+     * <p><b>NOTES:</b>: This is an experimental feature now.
+     *
+     * @return the string json representation of an optimized ExecNode plan for the statements and
+     *     Tables.
+     */
+    @Experimental
+    public String getJsonPlan() {
+        try {
+            return tableEnvironment.getJsonPlan(operations);
+        } finally {
+            operations.clear();

Review comment:
       I would like to keep the operations since it is more like explain

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpec.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+/**
+ * {@link DynamicTableSourceSpec} describes how to serialize/deserialize dynamic table sink table
+ * and create {@link DynamicTableSink} from the deserialization result.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DynamicTableSinkSpec extends CatalogTableSpecBase {
+
+    @JsonIgnore private DynamicTableSink tableSink;
+
+    @JsonCreator

Review comment:
       I think JsonCreator annotation and DynamicTableSinkSpecJsonDeserializer could not work together

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonPlanGenerator.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
+import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitorImpl;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MapperFeature;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.NamedType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+import org.reflections.Reflections;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An utility class that can generate json plan based on given {@link ExecNodeGraph} or generate
+ * {@link ExecNodeGraph} based on given json plan.
+ */
+public class ExecNodeGraphJsonPlanGenerator {
+
+    /** Generate json plan based on the given {@link ExecNodeGraph}. */
+    public static String generateJsonPlan(ExecNodeGraph execGraph, SerdeContext serdeCtx)
+            throws IOException {
+        final ObjectMapper mapper = new ObjectMapper();
+        final SimpleModule module = new SimpleModule();
+        registerSerializers(module, serdeCtx);
+        mapper.registerModule(module);
+
+        final StringWriter writer = new StringWriter(1024);
+        try (JsonGenerator gen = mapper.getFactory().createGenerator(writer)) {
+            TopologicalExecNodeGraph topologyGraph = new TopologicalExecNodeGraph(execGraph);
+            gen.writeObject(topologyGraph);
+        }
+
+        return writer.toString();
+    }
+
+    /** Generate {@link ExecNodeGraph} based on the given json plan. */
+    @SuppressWarnings({"rawtypes"})
+    public static ExecNodeGraph generateExecNodeGraph(String jsonPlan, SerdeContext serdeCtx)
+            throws IOException {
+        final ObjectMapper mapper = new ObjectMapper();
+        mapper.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false);
+        final SimpleModule module = new SimpleModule();
+        final Set<Class<? extends ExecNodeBase>> nodeClasses = scanSubClassesOfExecNodeBase();
+        nodeClasses.forEach(c -> module.registerSubtypes(new NamedType(c, c.getCanonicalName())));
+        final TopologicalExecNodeGraphDeContext graphCtx = new TopologicalExecNodeGraphDeContext();
+        registerDeserializers(mapper, module, serdeCtx, graphCtx);
+        mapper.registerModule(module);
+
+        final TopologicalExecNodeGraph topologicalGraph =
+                mapper.readValue(jsonPlan, TopologicalExecNodeGraph.class);
+
+        return topologicalGraph.convertToExecNodeGraph(graphCtx);
+    }
+
+    private static void registerSerializers(SimpleModule module, SerdeContext serdeCtx) {
+        module.addSerializer(new ChangelogModeJsonSerializer());
+    }
+
+    private static void registerDeserializers(
+            ObjectMapper mapper,
+            SimpleModule module,
+            SerdeContext serdeCtx,
+            TopologicalExecNodeGraphDeContext graphCtx) {
+        module.addDeserializer(ExecNode.class, new ExecNodeDeserializer(graphCtx, mapper));
+        module.addDeserializer(
+                DynamicTableSourceSpec.class,
+                new DynamicTableSourceSpec.DynamicTableSourceSpecJsonDeserializer(serdeCtx));
+        module.addDeserializer(
+                DynamicTableSinkSpec.class,
+                new DynamicTableSinkSpec.DynamicTableSinkSpecJsonDeserializer(serdeCtx));
+        module.addDeserializer(ChangelogMode.class, new ChangelogModeJsonDeserializer());
+    }
+
+    /**
+     * The json plan representing pojo class.
+     *
+     * <p>This class can be serialize to json plan, or deserialize from json plan.
+     */
+    public static class TopologicalExecNodeGraph {
+        public static final String FIELD_NAME_FLINK_VERSION = "flinkVersion";
+        public static final String FIELD_NAME_NODES = "nodes";
+
+        @JsonProperty(FIELD_NAME_FLINK_VERSION)
+        private final String flinkVersion;
+
+        @JsonProperty(FIELD_NAME_NODES)
+        private final ExecNode<?>[] topologicalOrderingNodes;
+
+        public TopologicalExecNodeGraph(ExecNodeGraph execGraph) {
+            topologicalOrderingNodes = getAllNodesAsTopologicalOrdering(execGraph);
+            this.flinkVersion = execGraph.getFlinkVersion();
+        }
+
+        @JsonCreator
+        public TopologicalExecNodeGraph(
+                @JsonProperty(FIELD_NAME_FLINK_VERSION) String flinkVersion,
+                @JsonProperty(FIELD_NAME_NODES) ExecNode<?>[] topologicalOrderingNodes) {
+            this.flinkVersion = flinkVersion;
+            this.topologicalOrderingNodes = topologicalOrderingNodes;
+        }
+
+        private ExecNode<?>[] getAllNodesAsTopologicalOrdering(ExecNodeGraph execGraph) {
+            final List<ExecNode<?>> allNodes = new ArrayList<>();
+            final Set<Integer> nodesIds = new HashSet<>();
+            // for quick search
+            final Set<ExecNode<?>> visitedNodes = Sets.newIdentityHashSet();
+
+            final ExecNodeVisitor visitor =
+                    new ExecNodeVisitorImpl() {
+                        @Override
+                        public void visit(ExecNode<?> node) {
+                            if (visitedNodes.contains(node)) {
+                                return;
+                            }
+                            super.visitInputs(node);
+
+                            final int id = node.getId();
+                            if (nodesIds.contains(id)) {
+                                throw new TableException(
+                                        String.format(
+                                                "The id: %s is not unique for ExecNode: %s.\nplease check it.",
+                                                id, node.getDesc()));
+                            }
+
+                            allNodes.add(node);
+                            nodesIds.add(id);
+                            visitedNodes.add(node);
+                        }
+                    };
+
+            execGraph.getRootNodes().forEach(visitor::visit);
+            checkArgument(allNodes.size() == nodesIds.size());
+            return allNodes.toArray(new ExecNode<?>[0]);
+        }
+
+        public ExecNodeGraph convertToExecNodeGraph(TopologicalExecNodeGraphDeContext ctx) {
+            for (ExecNode<?> execNode : topologicalOrderingNodes) {
+                List<ExecNode<?>> inputNodes = ctx.getInputNodes(execNode.getId());
+                ((ExecNodeBase<?>) execNode).setInputNodes(inputNodes);
+            }
+            List<ExecNode<?>> rootNodes = ctx.getRootNodes();
+            return new ExecNodeGraph(rootNodes);
+        }
+    }
+
+    /** JSON deserializer for {@link ExecNodeBase}. */
+    public static class ExecNodeDeserializer extends StdDeserializer<ExecNodeBase<?>> {

Review comment:
       I think it is better to mark such Deserializer top level class instead of inner class, because they are so important in the serialization




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381",
       "triggerID" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6357401af97f8c268fccb433c81c8a80cfe270d3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381",
       "triggerID" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "765887150",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * ef724e57e7676c154c8f0d6da93a811e9d411051 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381",
       "triggerID" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "765887150",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12432",
       "triggerID" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5d701796fc88f998c12bc58ada37b0248ea978c7 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12432) 
   * b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #14729:
URL: https://github.com/apache/flink/pull/14729#discussion_r569080654



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StatementSetImpl.java
##########
@@ -100,4 +101,27 @@ public TableResult execute() {
             operations.clear();
         }
     }
+
+    /**
+     * Get the json plan of the all statements and Tables as a batch.
+     *
+     * <p>The json plan is the string json representation of an optimized ExecNode plan for the
+     * statements and Tables. An ExecNode plan can be serialized to json plan, and a json plan can
+     * be deserialized to an ExecNode plan.
+     *
+     * <p>NOTES: Only the Blink planner supports this method.
+     *
+     * <p><b>NOTES:</b>: This is an experimental feature now.
+     *
+     * @return the string json representation of an optimized ExecNode plan for the statements and
+     *     Tables.
+     */
+    @Experimental
+    public String getJsonPlan() {
+        try {
+            return tableEnvironment.getJsonPlan(operations);
+        } finally {
+            operations.clear();

Review comment:
       make sense




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381",
       "triggerID" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "765887150",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12432",
       "triggerID" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12492",
       "triggerID" : "b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5d701796fc88f998c12bc58ada37b0248ea978c7 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12432) 
   * b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12492) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381",
       "triggerID" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "765887150",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12432",
       "triggerID" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12492",
       "triggerID" : "b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12492) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381",
       "triggerID" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "765887150",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ef724e57e7676c154c8f0d6da93a811e9d411051 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402) 
   * 5d701796fc88f998c12bc58ada37b0248ea978c7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381",
       "triggerID" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "765887150",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12432",
       "triggerID" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12492",
       "triggerID" : "b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8e7fedb102a0d654ecd59f862e9b912471c2386f",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12786",
       "triggerID" : "8e7fedb102a0d654ecd59f862e9b912471c2386f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8e7fedb102a0d654ecd59f862e9b912471c2386f Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12786) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381",
       "triggerID" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6357401af97f8c268fccb433c81c8a80cfe270d3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381) 
   * ef724e57e7676c154c8f0d6da93a811e9d411051 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765394656


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 6357401af97f8c268fccb433c81c8a80cfe270d3 (Fri Jan 22 13:27:26 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe closed pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
godfreyhe closed pull request #14729:
URL: https://github.com/apache/flink/pull/14729


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765887150


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381",
       "triggerID" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6357401af97f8c268fccb433c81c8a80cfe270d3 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381) 
   * ef724e57e7676c154c8f0d6da93a811e9d411051 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381",
       "triggerID" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * ef724e57e7676c154c8f0d6da93a811e9d411051 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6357401af97f8c268fccb433c81c8a80cfe270d3 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wenlong88 commented on a change in pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
wenlong88 commented on a change in pull request #14729:
URL: https://github.com/apache/flink/pull/14729#discussion_r568542445



##########
File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/StatementSetImpl.java
##########
@@ -100,4 +101,27 @@ public TableResult execute() {
             operations.clear();
         }
     }
+
+    /**
+     * Get the json plan of the all statements and Tables as a batch.
+     *
+     * <p>The json plan is the string json representation of an optimized ExecNode plan for the
+     * statements and Tables. An ExecNode plan can be serialized to json plan, and a json plan can
+     * be deserialized to an ExecNode plan.
+     *
+     * <p>NOTES: Only the Blink planner supports this method.
+     *
+     * <p><b>NOTES:</b>: This is an experimental feature now.
+     *
+     * @return the string json representation of an optimized ExecNode plan for the statements and
+     *     Tables.
+     */
+    @Experimental
+    public String getJsonPlan() {
+        try {
+            return tableEnvironment.getJsonPlan(operations);
+        } finally {
+            operations.clear();

Review comment:
       I would like to keep the operations since it is more like explain

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpec.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+/**
+ * {@link DynamicTableSourceSpec} describes how to serialize/deserialize dynamic table sink table
+ * and create {@link DynamicTableSink} from the deserialization result.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DynamicTableSinkSpec extends CatalogTableSpecBase {
+
+    @JsonIgnore private DynamicTableSink tableSink;
+
+    @JsonCreator

Review comment:
       I think JsonCreator annotation and DynamicTableSinkSpecJsonDeserializer could not work together

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonPlanGenerator.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
+import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
+import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitorImpl;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MapperFeature;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.NamedType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+import org.reflections.Reflections;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * An utility class that can generate json plan based on given {@link ExecNodeGraph} or generate
+ * {@link ExecNodeGraph} based on given json plan.
+ */
+public class ExecNodeGraphJsonPlanGenerator {
+
+    /** Generate json plan based on the given {@link ExecNodeGraph}. */
+    public static String generateJsonPlan(ExecNodeGraph execGraph, SerdeContext serdeCtx)
+            throws IOException {
+        final ObjectMapper mapper = new ObjectMapper();
+        final SimpleModule module = new SimpleModule();
+        registerSerializers(module, serdeCtx);
+        mapper.registerModule(module);
+
+        final StringWriter writer = new StringWriter(1024);
+        try (JsonGenerator gen = mapper.getFactory().createGenerator(writer)) {
+            TopologicalExecNodeGraph topologyGraph = new TopologicalExecNodeGraph(execGraph);
+            gen.writeObject(topologyGraph);
+        }
+
+        return writer.toString();
+    }
+
+    /** Generate {@link ExecNodeGraph} based on the given json plan. */
+    @SuppressWarnings({"rawtypes"})
+    public static ExecNodeGraph generateExecNodeGraph(String jsonPlan, SerdeContext serdeCtx)
+            throws IOException {
+        final ObjectMapper mapper = new ObjectMapper();
+        mapper.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false);
+        final SimpleModule module = new SimpleModule();
+        final Set<Class<? extends ExecNodeBase>> nodeClasses = scanSubClassesOfExecNodeBase();
+        nodeClasses.forEach(c -> module.registerSubtypes(new NamedType(c, c.getCanonicalName())));
+        final TopologicalExecNodeGraphDeContext graphCtx = new TopologicalExecNodeGraphDeContext();
+        registerDeserializers(mapper, module, serdeCtx, graphCtx);
+        mapper.registerModule(module);
+
+        final TopologicalExecNodeGraph topologicalGraph =
+                mapper.readValue(jsonPlan, TopologicalExecNodeGraph.class);
+
+        return topologicalGraph.convertToExecNodeGraph(graphCtx);
+    }
+
+    private static void registerSerializers(SimpleModule module, SerdeContext serdeCtx) {
+        module.addSerializer(new ChangelogModeJsonSerializer());
+    }
+
+    private static void registerDeserializers(
+            ObjectMapper mapper,
+            SimpleModule module,
+            SerdeContext serdeCtx,
+            TopologicalExecNodeGraphDeContext graphCtx) {
+        module.addDeserializer(ExecNode.class, new ExecNodeDeserializer(graphCtx, mapper));
+        module.addDeserializer(
+                DynamicTableSourceSpec.class,
+                new DynamicTableSourceSpec.DynamicTableSourceSpecJsonDeserializer(serdeCtx));
+        module.addDeserializer(
+                DynamicTableSinkSpec.class,
+                new DynamicTableSinkSpec.DynamicTableSinkSpecJsonDeserializer(serdeCtx));
+        module.addDeserializer(ChangelogMode.class, new ChangelogModeJsonDeserializer());
+    }
+
+    /**
+     * The json plan representing pojo class.
+     *
+     * <p>This class can be serialize to json plan, or deserialize from json plan.
+     */
+    public static class TopologicalExecNodeGraph {
+        public static final String FIELD_NAME_FLINK_VERSION = "flinkVersion";
+        public static final String FIELD_NAME_NODES = "nodes";
+
+        @JsonProperty(FIELD_NAME_FLINK_VERSION)
+        private final String flinkVersion;
+
+        @JsonProperty(FIELD_NAME_NODES)
+        private final ExecNode<?>[] topologicalOrderingNodes;
+
+        public TopologicalExecNodeGraph(ExecNodeGraph execGraph) {
+            topologicalOrderingNodes = getAllNodesAsTopologicalOrdering(execGraph);
+            this.flinkVersion = execGraph.getFlinkVersion();
+        }
+
+        @JsonCreator
+        public TopologicalExecNodeGraph(
+                @JsonProperty(FIELD_NAME_FLINK_VERSION) String flinkVersion,
+                @JsonProperty(FIELD_NAME_NODES) ExecNode<?>[] topologicalOrderingNodes) {
+            this.flinkVersion = flinkVersion;
+            this.topologicalOrderingNodes = topologicalOrderingNodes;
+        }
+
+        private ExecNode<?>[] getAllNodesAsTopologicalOrdering(ExecNodeGraph execGraph) {
+            final List<ExecNode<?>> allNodes = new ArrayList<>();
+            final Set<Integer> nodesIds = new HashSet<>();
+            // for quick search
+            final Set<ExecNode<?>> visitedNodes = Sets.newIdentityHashSet();
+
+            final ExecNodeVisitor visitor =
+                    new ExecNodeVisitorImpl() {
+                        @Override
+                        public void visit(ExecNode<?> node) {
+                            if (visitedNodes.contains(node)) {
+                                return;
+                            }
+                            super.visitInputs(node);
+
+                            final int id = node.getId();
+                            if (nodesIds.contains(id)) {
+                                throw new TableException(
+                                        String.format(
+                                                "The id: %s is not unique for ExecNode: %s.\nplease check it.",
+                                                id, node.getDesc()));
+                            }
+
+                            allNodes.add(node);
+                            nodesIds.add(id);
+                            visitedNodes.add(node);
+                        }
+                    };
+
+            execGraph.getRootNodes().forEach(visitor::visit);
+            checkArgument(allNodes.size() == nodesIds.size());
+            return allNodes.toArray(new ExecNode<?>[0]);
+        }
+
+        public ExecNodeGraph convertToExecNodeGraph(TopologicalExecNodeGraphDeContext ctx) {
+            for (ExecNode<?> execNode : topologicalOrderingNodes) {
+                List<ExecNode<?>> inputNodes = ctx.getInputNodes(execNode.getId());
+                ((ExecNodeBase<?>) execNode).setInputNodes(inputNodes);
+            }
+            List<ExecNode<?>> rootNodes = ctx.getRootNodes();
+            return new ExecNodeGraph(rootNodes);
+        }
+    }
+
+    /** JSON deserializer for {@link ExecNodeBase}. */
+    public static class ExecNodeDeserializer extends StdDeserializer<ExecNodeBase<?>> {

Review comment:
       I think it is better to mark such Deserializer top level class instead of inner class, because they are so important in the serialization




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #14729:
URL: https://github.com/apache/flink/pull/14729#discussion_r564636504



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpec.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+/**
+ * {@link DynamicTableSourceSpec} describes how to serialize/deserialize dynamic table source table
+ * and create {@link DynamicTableSource} from the deserialization result.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DynamicTableSourceSpec extends CatalogTableSpecBase {
+
+    @JsonIgnore private DynamicTableSource tableSource;

Review comment:
       good catch! we should provide the a new interface to serialize push-down part in the table source, and when doing deserialization, we will build the table source instance first, and then apply the push-down to the new table source.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #14729:
URL: https://github.com/apache/flink/pull/14729#discussion_r564636943



##########
File path: flink-table/flink-table-planner-blink/pom.xml
##########
@@ -418,6 +419,11 @@ under the License.
 									<shadedPattern>org.apache.flink.table.shaded.org.codehaus</shadedPattern>
 								</relocation>-->
 
+								<relocation>
+									<pattern>org.reflections</pattern>
+									<shadedPattern>org.apache.flink.calcite.shaded.org.reflections</shadedPattern>

Review comment:
       typo, I will fix it. `test_table_shaded_dependencies.sh` has dependency check.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381",
       "triggerID" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "765887150",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * ef724e57e7676c154c8f0d6da93a811e9d411051 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381",
       "triggerID" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "765887150",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12432",
       "triggerID" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12492",
       "triggerID" : "b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8e7fedb102a0d654ecd59f862e9b912471c2386f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12786",
       "triggerID" : "8e7fedb102a0d654ecd59f862e9b912471c2386f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12492) 
   * 8e7fedb102a0d654ecd59f862e9b912471c2386f Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12786) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765403029


   cc @wenlong88


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765394656


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 8a6de343620a78f1bb9af2a9892a837c7081c704 (Fri May 28 08:16:30 UTC 2021)
   
   **Warnings:**
    * **2 pom.xml files were touched**: Check for build and licensing issues.
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #14729:
URL: https://github.com/apache/flink/pull/14729#discussion_r564643450



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
##########
@@ -36,46 +46,97 @@
  *
  * @param <T> The type of the elements that result from this node.
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "class")
 public abstract class ExecNodeBase<T> implements ExecNode<T> {
 
-    private final String description;
-    private final List<ExecEdge> inputEdges;
-    private final LogicalType outputType;
+    public static final String FIELD_NAME_ID = "id";
+    public static final String FIELD_NAME_CLASS = "class";
+    public static final String FIELD_NAME_DESCRIPTION = "description";
+    public static final String FIELD_NAME_INPUT_EDGES = "inputEdges";
+    public static final String FIELD_NAME_OUTPUT_TYPE = "outputType";
+    public static final String FILED_NAME_INPUTS = "inputs";
+
+    /** The unique identifier for each ExecNode in the json plan. */
+    @JsonIgnore private final int id;
+
+    @JsonIgnore private final String description;
+
+    @JsonIgnore private final List<ExecEdge> inputEdges;
+
+    @JsonIgnore private final LogicalType outputType;
+
     // TODO remove this field once edge support `source` and `target`,
     //  and then we can get/set `inputNodes` through `inputEdges`.
-    private List<ExecNode<?>> inputNodes;
+    @JsonIgnore private List<ExecNode<?>> inputNodes;
 
-    private transient Transformation<T> transformation;
+    @JsonIgnore private transient Transformation<T> transformation;
 
-    protected ExecNodeBase(List<ExecEdge> inputEdges, LogicalType outputType, String description) {
+    /** This is used to assign a unique ID to every ExecNode. */
+    private static Integer idCounter = 0;
+
+    /** Generate an unique ID for ExecNode. */
+    public static int getNewNodeId() {
+        idCounter++;
+        return idCounter;
+    }
+
+    // used for json creator
+    protected ExecNodeBase(
+            int id, List<ExecEdge> inputEdges, LogicalType outputType, String description) {
+        this.id = id;
         this.inputEdges = new ArrayList<>(checkNotNull(inputEdges));
         this.outputType = checkNotNull(outputType);
         this.description = checkNotNull(description);
     }
 
+    protected ExecNodeBase(List<ExecEdge> inputEdges, LogicalType outputType, String description) {
+        this(getNewNodeId(), inputEdges, outputType, description);
+    }
+
+    @JsonProperty(value = FIELD_NAME_ID)
+    @Override
+    public final int getId() {
+        return id;
+    }
+
+    /** The name used to identify each sub-class in the json plan. */
+    @JsonProperty(value = FIELD_NAME_CLASS, access = JsonProperty.Access.READ_ONLY)
+    public final String getClassName() {
+        return getClass().getCanonicalName();
+    }
+
+    @JsonProperty(value = FIELD_NAME_DESCRIPTION)

Review comment:
       there are no fields for `getClassName` and `getInputNodes`(`inputNodes` will be removed later), putting `JsonProperty` all on `get` method will more unified style.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] wenlong88 commented on a change in pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
wenlong88 commented on a change in pull request #14729:
URL: https://github.com/apache/flink/pull/14729#discussion_r564170827



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
##########
@@ -21,39 +21,97 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /** The representation of an edge connecting two {@link ExecNode}. */
 @Internal
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class ExecEdge {
 
     public static final ExecEdge DEFAULT = ExecEdge.builder().build();
 
+    public static final String FIELD_NAME_REQUIRED_SHUFFLE = "requiredShuffle";
+    public static final String FIELD_NAME_DAM_BEHAVIOR = "damBehavior";
+    public static final String FIELD_NAME_PRIORITY = "priority";
+
+    @JsonProperty(FIELD_NAME_REQUIRED_SHUFFLE)
+    @JsonSerialize(using = RequiredShuffleJsonSerializer.class)
+    @JsonDeserialize(using = RequiredShuffleJsonDeserializer.class)

Review comment:
       why do we need the serializer? I think just add json annotation to RequiredShuffle may be enough?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CatalogTableSpecBase.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** {@link CatalogTableSpecBase} describes how to serialize/deserialize a catalog table. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CatalogTableSpecBase {
+    public static final String FIELD_NAME_IDENTIFIER = "identifier";
+    public static final String FIELD_NAME_CATALOG_NAME = "catalogName";
+    public static final String FIELD_NAME_DATABASE_NAME = "databaseName";
+    public static final String FIELD_NAME_TABLE_NAME = "tableName";
+
+    public static final String FIELD_NAME_CATALOG_TABLE = "catalogTable";
+    public static final String FIELD_NAME_CONFIGURATION = "configuration";
+
+    @JsonProperty(value = FIELD_NAME_IDENTIFIER, required = true)
+    @JsonSerialize(using = ObjectIdentifierJsonSerializer.class)
+    @JsonDeserialize(using = ObjectIdentifierJsonDeserializer.class)
+    protected final ObjectIdentifier objectIdentifier;
+
+    @JsonProperty(value = FIELD_NAME_CATALOG_TABLE, required = true)
+    @JsonSerialize(using = CatalogTableJsonSerializer.class)
+    @JsonDeserialize(using = CatalogTableJsonDeserializer.class)
+    protected final CatalogTable catalogTable;
+
+    @JsonProperty(value = FIELD_NAME_CONFIGURATION, required = true)
+    @JsonSerialize(using = ReadableConfigJsonSerializer.class)
+    @JsonDeserialize(using = ReadableConfigJsonDeserializer.class)
+    protected final ReadableConfig configuration;

Review comment:
       the configuration here is the table config, I think it is not good to persist configuration per table.

##########
File path: flink-table/flink-table-planner-blink/pom.xml
##########
@@ -418,6 +419,11 @@ under the License.
 									<shadedPattern>org.apache.flink.table.shaded.org.codehaus</shadedPattern>
 								</relocation>-->
 
+								<relocation>
+									<pattern>org.reflections</pattern>
+									<shadedPattern>org.apache.flink.calcite.shaded.org.reflections</shadedPattern>

Review comment:
       why do we need to add the shading? the shading pattern should start with be org.apache.flink.table?

##########
File path: flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeSerdeTest.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.table.expressions.TimeIntervalUnit;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LegacyTypeInformationType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampKind;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.TypeInformationRawType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests for {@link LogicalType} serialization and deserialization. */
+@RunWith(Parameterized.class)
+public class LogicalTypeSerdeTest {

Review comment:
       we may also need to test the coverage to make sure that all of the logicalType is supported.

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
##########
@@ -36,46 +46,97 @@
  *
  * @param <T> The type of the elements that result from this node.
  */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "class")
 public abstract class ExecNodeBase<T> implements ExecNode<T> {
 
-    private final String description;
-    private final List<ExecEdge> inputEdges;
-    private final LogicalType outputType;
+    public static final String FIELD_NAME_ID = "id";
+    public static final String FIELD_NAME_CLASS = "class";
+    public static final String FIELD_NAME_DESCRIPTION = "description";
+    public static final String FIELD_NAME_INPUT_EDGES = "inputEdges";
+    public static final String FIELD_NAME_OUTPUT_TYPE = "outputType";
+    public static final String FILED_NAME_INPUTS = "inputs";
+
+    /** The unique identifier for each ExecNode in the json plan. */
+    @JsonIgnore private final int id;
+
+    @JsonIgnore private final String description;
+
+    @JsonIgnore private final List<ExecEdge> inputEdges;
+
+    @JsonIgnore private final LogicalType outputType;
+
     // TODO remove this field once edge support `source` and `target`,
     //  and then we can get/set `inputNodes` through `inputEdges`.
-    private List<ExecNode<?>> inputNodes;
+    @JsonIgnore private List<ExecNode<?>> inputNodes;
 
-    private transient Transformation<T> transformation;
+    @JsonIgnore private transient Transformation<T> transformation;
 
-    protected ExecNodeBase(List<ExecEdge> inputEdges, LogicalType outputType, String description) {
+    /** This is used to assign a unique ID to every ExecNode. */
+    private static Integer idCounter = 0;
+
+    /** Generate an unique ID for ExecNode. */
+    public static int getNewNodeId() {
+        idCounter++;
+        return idCounter;
+    }
+
+    // used for json creator
+    protected ExecNodeBase(
+            int id, List<ExecEdge> inputEdges, LogicalType outputType, String description) {
+        this.id = id;
         this.inputEdges = new ArrayList<>(checkNotNull(inputEdges));
         this.outputType = checkNotNull(outputType);
         this.description = checkNotNull(description);
     }
 
+    protected ExecNodeBase(List<ExecEdge> inputEdges, LogicalType outputType, String description) {
+        this(getNewNodeId(), inputEdges, outputType, description);
+    }
+
+    @JsonProperty(value = FIELD_NAME_ID)
+    @Override
+    public final int getId() {
+        return id;
+    }
+
+    /** The name used to identify each sub-class in the json plan. */
+    @JsonProperty(value = FIELD_NAME_CLASS, access = JsonProperty.Access.READ_ONLY)
+    public final String getClassName() {
+        return getClass().getCanonicalName();
+    }
+
+    @JsonProperty(value = FIELD_NAME_DESCRIPTION)

Review comment:
       why not just add annotation on fields, I think it is not so straight-forward to add on get Method

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CatalogTableSpecBase.java
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** {@link CatalogTableSpecBase} describes how to serialize/deserialize a catalog table. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CatalogTableSpecBase {

Review comment:
       move spec class to utils or create a new packages for specs?

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/LogicalTypeJsonSerializer.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.TypeInformationRawType;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.table.utils.TypeStringUtils;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/**
+ * JSON serializer for {@link LogicalType}. refer to {@link LogicalTypeJsonDeserializer} for
+ * deserializer.
+ */
+public class LogicalTypeJsonSerializer extends StdSerializer<LogicalType> {
+    private static final long serialVersionUID = 1L;
+
+    public static final String FIELD_NAME_NULLABLE = "nullable";
+    public static final String FIELD_NAME_SYMBOL_CLASS = "symbolClass";
+    public static final String FIELD_NAME_TYPE_INFO = "typeInfo";
+
+    public LogicalTypeJsonSerializer() {
+        super(LogicalType.class);
+    }
+
+    @Override
+    public void serialize(
+            LogicalType logicalType,
+            JsonGenerator jsonGenerator,
+            SerializerProvider serializerProvider)
+            throws IOException {
+        if (logicalType instanceof SymbolType) {
+            // SymbolType does not support `asSerializableString`
+            SymbolType<?> symbolType = (SymbolType<?>) logicalType;
+            jsonGenerator.writeStartObject();
+            jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, symbolType.isNullable());
+            jsonGenerator.writeStringField(
+                    FIELD_NAME_SYMBOL_CLASS, symbolType.getDefaultConversion().getCanonicalName());
+            jsonGenerator.writeEndObject();
+        } else if (logicalType instanceof TypeInformationRawType) {
+            // TypeInformationRawType does not support `asSerializableString`
+            TypeInformationRawType<?> rawType = (TypeInformationRawType<?>) logicalType;
+            jsonGenerator.writeStartObject();
+            jsonGenerator.writeBooleanField(FIELD_NAME_NULLABLE, rawType.isNullable());
+            jsonGenerator.writeStringField(
+                    FIELD_NAME_TYPE_INFO,
+                    EncodingUtils.escapeSingleQuotes(
+                            TypeStringUtils.writeTypeInfo(rawType.getTypeInformation())));

Review comment:
       how about use base64 encoding? I thinks it would be safer than just escaping single quote

##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpec.java
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.serde;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+/**
+ * {@link DynamicTableSourceSpec} describes how to serialize/deserialize dynamic table source table
+ * and create {@link DynamicTableSource} from the deserialization result.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DynamicTableSourceSpec extends CatalogTableSpecBase {
+
+    @JsonIgnore private DynamicTableSource tableSource;

Review comment:
       I am afraid that it would not work well when there is something pushed down to the table source, we can not rebuild it from catalog table




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #14729:
URL: https://github.com/apache/flink/pull/14729#issuecomment-765399004


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12381",
       "triggerID" : "6357401af97f8c268fccb433c81c8a80cfe270d3",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ef724e57e7676c154c8f0d6da93a811e9d411051",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12402",
       "triggerID" : "765887150",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12432",
       "triggerID" : "5d701796fc88f998c12bc58ada37b0248ea978c7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12492",
       "triggerID" : "b39c8c7d0f68afd25fec0cf79b7fdf9f0d8d37d7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8e7fedb102a0d654ecd59f862e9b912471c2386f",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12786",
       "triggerID" : "8e7fedb102a0d654ecd59f862e9b912471c2386f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8a6de343620a78f1bb9af2a9892a837c7081c704",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8a6de343620a78f1bb9af2a9892a837c7081c704",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8e7fedb102a0d654ecd59f862e9b912471c2386f Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=12786) 
   * 8a6de343620a78f1bb9af2a9892a837c7081c704 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #14729:
URL: https://github.com/apache/flink/pull/14729#discussion_r564636943



##########
File path: flink-table/flink-table-planner-blink/pom.xml
##########
@@ -418,6 +419,11 @@ under the License.
 									<shadedPattern>org.apache.flink.table.shaded.org.codehaus</shadedPattern>
 								</relocation>-->
 
+								<relocation>
+									<pattern>org.reflections</pattern>
+									<shadedPattern>org.apache.flink.calcite.shaded.org.reflections</shadedPattern>

Review comment:
       typo, I will fix it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org