You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/26 15:55:46 UTC

[GitHub] [flink] godfreyhe commented on a change in pull request #14729: [FLINK-21092][FLINK-21093][FLINK-21094][FLINK-21096][table-planner-blink] Support ExecNode plan serialization/deserialization for `INSERT INTO MySink SELECT * FROM MyTable`

godfreyhe commented on a change in pull request #14729:
URL: https://github.com/apache/flink/pull/14729#discussion_r564623272



##########
File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
##########
@@ -21,39 +21,97 @@
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /** The representation of an edge connecting two {@link ExecNode}. */
 @Internal
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class ExecEdge {
 
     public static final ExecEdge DEFAULT = ExecEdge.builder().build();
 
+    public static final String FIELD_NAME_REQUIRED_SHUFFLE = "requiredShuffle";
+    public static final String FIELD_NAME_DAM_BEHAVIOR = "damBehavior";
+    public static final String FIELD_NAME_PRIORITY = "priority";
+
+    @JsonProperty(FIELD_NAME_REQUIRED_SHUFFLE)
+    @JsonSerialize(using = RequiredShuffleJsonSerializer.class)
+    @JsonDeserialize(using = RequiredShuffleJsonDeserializer.class)

Review comment:
       json annotation is enough for ExecEdge now, but I do some refactoring in #14757, then it need a specific serializer/deserializer. I can revert them and introduce them after #14757 is finished.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org