You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/27 14:32:32 UTC

[GitHub] [flink] twalthr commented on a change in pull request #18479: [FLINK-25387] Introduce ExecNodeMetadata

twalthr commented on a change in pull request #18479:
URL: https://github.com/apache/flink/pull/18479#discussion_r793606066



##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
##########
@@ -46,18 +46,28 @@
 @JsonIgnoreProperties(ignoreUnknown = true)
 public abstract class ExecNodeBase<T> implements ExecNode<T> {
 
-    /** The unique identifier for each ExecNode in the json plan. */
-    @JsonIgnore private final int id;
+    private final String description;
 
-    @JsonIgnore private final String description;
+    private final LogicalType outputType;
 
-    @JsonIgnore private final LogicalType outputType;
+    private final List<InputProperty> inputProperties;
 
-    @JsonIgnore private final List<InputProperty> inputProperties;
+    private List<ExecEdge> inputEdges;
 
-    @JsonIgnore private List<ExecEdge> inputEdges;
+    private transient Transformation<T> transformation;
 
-    @JsonIgnore private transient Transformation<T> transformation;
+    /** Holds the context information (id, name, version) as de-serialised from a JSON plan. */

Review comment:
       we use American grammar in our code base and documentation `de-serialised`->`deserialized`

##########
File path: flink-table/flink-table-planner/pom.xml
##########
@@ -373,8 +355,6 @@ under the License.
 							<!-- For legacy string expressions in Table API -->
 							<include>org.scala-lang.modules:scala-parser-combinators_${scala.binary.version}</include>
 
-							<!-- ReflectionsUtil -->

Review comment:
       remove trailing empty line as well

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
+
+/**
+ * Helper Pojo that holds the necessary identifier fields that are used for JSON plan serialisation
+ * and de-serialisation.
+ */
+public class ExecNodeContext {
+
+    private final int id;
+    private final String name;
+    private final Integer version;
+
+    public ExecNodeContext(int id, String name, Integer version) {
+        this.id = id;
+        this.name = name;
+        this.version = version;
+    }
+
+    public ExecNodeContext(int id) {

Review comment:
       who is calling this?

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
##########
@@ -74,26 +84,21 @@ public static void resetIdCounter() {
         idCounter = 0;
     }
 
-    // used for json creator
     protected ExecNodeBase(
-            int id,
+            ExecNodeContext context,
             List<InputProperty> inputProperties,
             LogicalType outputType,
             String description) {
-        this.id = id;
+        checkNotNull(context);

Review comment:
       do this inline, similar to the other assignments

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
+
+/**
+ * Helper Pojo that holds the necessary identifier fields that are used for JSON plan serialisation
+ * and de-serialisation.
+ */
+public class ExecNodeContext {
+
+    private final int id;
+    private final String name;
+    private final Integer version;
+
+    public ExecNodeContext(int id, String name, Integer version) {
+        this.id = id;
+        this.name = name;
+        this.version = version;
+    }
+
+    public ExecNodeContext(int id) {
+        this(id, null, null);
+    }
+
+    @JsonCreator
+    public ExecNodeContext(String value) {
+        String[] split = value.split("_");
+        this.id = Integer.parseInt(split[0]);
+        this.name = split[1];
+        this.version = Integer.valueOf(split[2]);
+    }
+
+    /** The unique identifier for each ExecNode in the JSON plan. */
+    public int getId() {
+        return id;
+    }
+
+    /** The type identifying an ExecNode in the JSON plan. See {@link ExecNodeMetadata#name()}. */
+    public String getName() {
+        return name;
+    }
+
+    /** The version of the ExecNode in the JSON plan. See {@link ExecNodeMetadata#version()}. */
+    public Integer getVersion() {
+        return version;
+    }
+
+    @JsonValue
+    @Override
+    public String toString() {
+        return id + "_" + name + "_" + version;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static ExecNodeContext newMetadata(Class<? extends ExecNode> execNode) {

Review comment:
       avoid warning suppression:
   ```
   public static <T extends ExecNode<?>> ExecNodeContext newMetadata(Class<T> execNode)
   ```

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeMetadata.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to be used for {@link ExecNode}s to keep necessary metadata when
+ * serialising/deserialising them in a plan.
+ *
+ * <p>Each {@link ExecNode} needs to be annotated and provide the necessary metadata info so that it
+ * can be correctly serialised and later on instantiated from a string (JSON) plan.
+ *
+ * <p>It's possible for one {@link ExecNode} class to user multiple annotations to denote ability to

Review comment:
       `to use`
   
   mention the purpose of this annotation: internal bookkeeping across Flink versions, information for our testing infrastructure

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeMetadata.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to be used for {@link ExecNode}s to keep necessary metadata when
+ * serialising/deserialising them in a plan.
+ *
+ * <p>Each {@link ExecNode} needs to be annotated and provide the necessary metadata info so that it
+ * can be correctly serialised and later on instantiated from a string (JSON) plan.
+ *
+ * <p>It's possible for one {@link ExecNode} class to user multiple annotations to denote ability to
+ * upgrade to more versions.
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@PublicEvolving
+public @interface ExecNodeMetadata {
+    // main information
+
+    /**
+     * Unique name of the {@link ExecNode} for serialization/deserialization and uid() generation.
+     * Together with version, uniquely identifies the {@link ExecNode} class.
+     */
+    String name();
+
+    /**
+     * A positive integer denoting the evolving version of an {@link ExecNode}, used for
+     * serialization/deserialization and uid() generation. Together with {@link #name()}, uniquely
+     * identifies the {@link ExecNode} class.
+     */
+    @JsonProperty("version")
+    int version();
+
+    // maintenance information for internal/community/test usage
+
+    /**
+     * Hard coded list of {@link ExecutionConfigOptions} keys of in the Flink version when the
+     * ExecNode was added. Does not reference instances in the {@link ExecutionConfigOptions} class
+     * in case those get refactored.
+     *
+     * <p>Completeness tests can verify that every option is set once in restore and change
+     * detection tests.
+     *
+     * <p>Completeness tests can verify that the ExecutionConfigOptions class still contains an
+     * option (via key or fallback key) for the given key.
+     *
+     * <p>Restore can verify whether the restored ExecNode config map contains only options of the
+     * given keys.
+     */
+    @JsonProperty("consumedOptions")
+    String[] consumedOptions() default {};
+
+    /**
+     * Set of operator names that can be part of the resulting Transformations.

Review comment:
       `{@link`

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
+
+/**
+ * Helper Pojo that holds the necessary identifier fields that are used for JSON plan serialisation
+ * and de-serialisation.
+ */
+public class ExecNodeContext {
+
+    private final int id;
+    private final String name;
+    private final Integer version;
+
+    public ExecNodeContext(int id, String name, Integer version) {
+        this.id = id;
+        this.name = name;
+        this.version = version;
+    }
+
+    public ExecNodeContext(int id) {
+        this(id, null, null);
+    }
+
+    @JsonCreator
+    public ExecNodeContext(String value) {
+        String[] split = value.split("_");
+        this.id = Integer.parseInt(split[0]);
+        this.name = split[1];
+        this.version = Integer.valueOf(split[2]);
+    }
+
+    /** The unique identifier for each ExecNode in the JSON plan. */
+    public int getId() {
+        return id;
+    }
+
+    /** The type identifying an ExecNode in the JSON plan. See {@link ExecNodeMetadata#name()}. */
+    public String getName() {
+        return name;
+    }
+
+    /** The version of the ExecNode in the JSON plan. See {@link ExecNodeMetadata#version()}. */
+    public Integer getVersion() {
+        return version;
+    }
+
+    @JsonValue
+    @Override
+    public String toString() {
+        return id + "_" + name + "_" + version;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public static ExecNodeContext newMetadata(Class<? extends ExecNode> execNode) {
+        return newMetadata(execNode, ExecNodeBase.getNewNodeId());
+    }
+
+    @SuppressWarnings("rawtypes")
+    static ExecNodeContext newMetadata(Class<? extends ExecNode> execNode, int id) {
+        ExecNodeMetadata metadata = ExecNodeMetadataUtil.latestAnnotation(execNode);
+        // Some StreamExecNodes likes StreamExecMultipleInput
+        // still don't support the ExecNodeMetadata annotation.

Review comment:
       why?

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
+
+/**
+ * Helper Pojo that holds the necessary identifier fields that are used for JSON plan serialisation
+ * and de-serialisation.
+ */
+public class ExecNodeContext {

Review comment:
       Add annotations to all you classes.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeMetadata.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to be used for {@link ExecNode}s to keep necessary metadata when
+ * serialising/deserialising them in a plan.
+ *
+ * <p>Each {@link ExecNode} needs to be annotated and provide the necessary metadata info so that it
+ * can be correctly serialised and later on instantiated from a string (JSON) plan.
+ *
+ * <p>It's possible for one {@link ExecNode} class to user multiple annotations to denote ability to
+ * upgrade to more versions.
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@PublicEvolving
+public @interface ExecNodeMetadata {
+    // main information
+
+    /**
+     * Unique name of the {@link ExecNode} for serialization/deserialization and uid() generation.
+     * Together with version, uniquely identifies the {@link ExecNode} class.
+     */
+    String name();
+
+    /**
+     * A positive integer denoting the evolving version of an {@link ExecNode}, used for
+     * serialization/deserialization and uid() generation. Together with {@link #name()}, uniquely
+     * identifies the {@link ExecNode} class.
+     */
+    @JsonProperty("version")
+    int version();
+
+    // maintenance information for internal/community/test usage
+
+    /**
+     * Hard coded list of {@link ExecutionConfigOptions} keys of in the Flink version when the
+     * ExecNode was added. Does not reference instances in the {@link ExecutionConfigOptions} class
+     * in case those get refactored.
+     *
+     * <p>Completeness tests can verify that every option is set once in restore and change
+     * detection tests.
+     *
+     * <p>Completeness tests can verify that the ExecutionConfigOptions class still contains an
+     * option (via key or fallback key) for the given key.
+     *
+     * <p>Restore can verify whether the restored ExecNode config map contains only options of the
+     * given keys.
+     */
+    @JsonProperty("consumedOptions")
+    String[] consumedOptions() default {};
+
+    /**
+     * Set of operator names that can be part of the resulting Transformations.
+     *
+     * <p>Restore and completeness tests can verify there exists at least one test that adds each
+     * operator and that the created Transformations contain only operators with `uid`s containing
+     * the given operator names.
+     *
+     * <p>The concrete combinations or existence of these operators in the final pipeline depends on
+     * various parameters (both configuration and ExecNode-specific arguments such as interval size
+     * etc.).
+     */
+    @JsonProperty("producedOperators")
+    String[] producedOperators() default {};
+
+    /**
+     * Used for plan validation and potentially plan migration.
+     *
+     * <p>Needs to be updated when the JSON for the ExecNode changes: e.g. after adding an attribute
+     * to the JSON spec of the ExecNode.
+     *
+     * <p>The annotation does not need to be updated for every Flink version. As the name suggests
+     * it is about the "minimum" version for a restore. If the minimum version is higher than the
+     * current Flink version, plan migration is necessary.
+     *
+     * <p>Changing this version will always result in a new ExecNode {@link #version()}.
+     *
+     * <p>Plan migration tests can use this information.
+     *
+     * <p>Completeness tests can verify that restore tests exist for all JSON plan variations.
+     */
+    @JsonProperty("minPlanVersion")
+    FlinkVersion minPlanVersion();
+
+    /**
+     * Used for operator and potentially savepoint migration.
+     *
+     * <p>Needs to be updated whenever the state layout of an ExecNode changes. In some cases, the

Review comment:
       This might need some refinement. So do I need to update the version if I use operator migration?

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecCalc.java
##########
@@ -42,11 +43,11 @@ public BatchExecCalc(
             RowType outputType,
             String description) {
         super(
+                ExecNodeContext.newMetadata(BatchExecCalc.class),

Review comment:
       rename to `newContext`, we don't construct metadata here

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeMetadata.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to be used for {@link ExecNode}s to keep necessary metadata when
+ * serialising/deserialising them in a plan.
+ *
+ * <p>Each {@link ExecNode} needs to be annotated and provide the necessary metadata info so that it
+ * can be correctly serialised and later on instantiated from a string (JSON) plan.
+ *
+ * <p>It's possible for one {@link ExecNode} class to user multiple annotations to denote ability to
+ * upgrade to more versions.
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@PublicEvolving
+public @interface ExecNodeMetadata {
+    // main information
+
+    /**
+     * Unique name of the {@link ExecNode} for serialization/deserialization and uid() generation.
+     * Together with version, uniquely identifies the {@link ExecNode} class.
+     */
+    String name();
+
+    /**
+     * A positive integer denoting the evolving version of an {@link ExecNode}, used for
+     * serialization/deserialization and uid() generation. Together with {@link #name()}, uniquely
+     * identifies the {@link ExecNode} class.
+     */
+    @JsonProperty("version")
+    int version();
+
+    // maintenance information for internal/community/test usage
+
+    /**
+     * Hard coded list of {@link ExecutionConfigOptions} keys of in the Flink version when the
+     * ExecNode was added. Does not reference instances in the {@link ExecutionConfigOptions} class
+     * in case those get refactored.
+     *
+     * <p>Completeness tests can verify that every option is set once in restore and change
+     * detection tests.
+     *
+     * <p>Completeness tests can verify that the ExecutionConfigOptions class still contains an
+     * option (via key or fallback key) for the given key.
+     *
+     * <p>Restore can verify whether the restored ExecNode config map contains only options of the
+     * given keys.
+     */
+    @JsonProperty("consumedOptions")
+    String[] consumedOptions() default {};
+
+    /**
+     * Set of operator names that can be part of the resulting Transformations.

Review comment:
       btw for `uid` we can also always link to `Transformation#uid`

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadatas;
+import org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecChangelogNormalize;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCorrelate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDropUpdateBefore;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExpand;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIncrementalGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIntervalJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLimit;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMatch;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMiniBatchAssigner;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCalc;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonOverAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSortLimit;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalSort;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecValues;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowDeduplicate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowRank;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowTableFunction;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/** Utility class for ExecNodeMetadata related functionality. */
+public final class ExecNodeMetadataUtil {
+
+    private ExecNodeMetadataUtil() {
+        // no instantiation
+    }
+
+    private static final Map<ExecNodeNameVersion, Class<? extends ExecNode<?>>> lookupMap =
+            new HashMap<>();
+
+    private static final Set<Class<? extends ExecNode<?>>> execNodes = new HashSet<>();
+
+    static {
+        execNodes.add(StreamExecCalc.class);
+        execNodes.add(StreamExecChangelogNormalize.class);
+        execNodes.add(StreamExecCorrelate.class);
+        execNodes.add(StreamExecDeduplicate.class);
+        execNodes.add(StreamExecDropUpdateBefore.class);
+        execNodes.add(StreamExecExchange.class);
+        execNodes.add(StreamExecExpand.class);
+        execNodes.add(StreamExecGlobalGroupAggregate.class);
+        execNodes.add(StreamExecGlobalWindowAggregate.class);
+        execNodes.add(StreamExecGroupAggregate.class);
+        execNodes.add(StreamExecGroupWindowAggregate.class);
+        execNodes.add(StreamExecIncrementalGroupAggregate.class);
+        execNodes.add(StreamExecIntervalJoin.class);
+        execNodes.add(StreamExecJoin.class);
+        execNodes.add(StreamExecLimit.class);
+        execNodes.add(StreamExecLocalGroupAggregate.class);
+        execNodes.add(StreamExecLocalWindowAggregate.class);
+        execNodes.add(StreamExecLookupJoin.class);
+        execNodes.add(StreamExecMatch.class);
+        execNodes.add(StreamExecMiniBatchAssigner.class);
+        execNodes.add(StreamExecOverAggregate.class);
+        execNodes.add(StreamExecPythonCalc.class);
+        execNodes.add(StreamExecPythonCorrelate.class);
+        execNodes.add(StreamExecPythonGroupAggregate.class);
+        execNodes.add(StreamExecPythonGroupWindowAggregate.class);
+        execNodes.add(StreamExecPythonOverAggregate.class);
+        execNodes.add(StreamExecRank.class);
+        execNodes.add(StreamExecSink.class);
+        execNodes.add(StreamExecSortLimit.class);
+        execNodes.add(StreamExecTableSourceScan.class);
+        execNodes.add(StreamExecTemporalJoin.class);
+        execNodes.add(StreamExecTemporalSort.class);
+        execNodes.add(StreamExecUnion.class);
+        execNodes.add(StreamExecValues.class);
+        execNodes.add(StreamExecWatermarkAssigner.class);
+        execNodes.add(StreamExecWindowAggregate.class);
+        execNodes.add(StreamExecWindowDeduplicate.class);
+        execNodes.add(StreamExecWindowJoin.class);
+        execNodes.add(StreamExecWindowRank.class);
+        execNodes.add(StreamExecWindowTableFunction.class);
+    }
+
+    static {
+        for (Class<? extends ExecNode<?>> execNodeClass : execNodes) {
+            addToLookupMap(execNodeClass);
+        }
+    }
+
+    public static Set<Class<? extends ExecNode<?>>> execNodes() {
+        return execNodes;
+    }
+
+    public static Class<? extends ExecNode<?>> retrieveExecNode(String name, int version) {
+        return lookupMap.get(new ExecNodeNameVersion(name, version));
+    }
+
+    @VisibleForTesting
+    static void addTestNode(Class<? extends ExecNode<?>> execNodeClass) {
+        addToLookupMap(execNodeClass);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static List<ExecNodeMetadata> extractMetadataFromAnnotation(
+            Class<? extends ExecNode> execNodeClass) {
+        List<ExecNodeMetadata> metadata = new ArrayList<>();
+        ExecNodeMetadata annotation = execNodeClass.getDeclaredAnnotation(ExecNodeMetadata.class);
+        if (annotation != null) {
+            metadata.add(annotation);
+        }
+
+        ExecNodeMetadatas annotations =

Review comment:
       can't we skip the check above? isn't the multi annotation including the single one or vice versa?

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadatas;
+import org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecChangelogNormalize;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCorrelate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDropUpdateBefore;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExpand;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIncrementalGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIntervalJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLimit;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMatch;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMiniBatchAssigner;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCalc;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonOverAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSortLimit;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalSort;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecValues;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowDeduplicate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowRank;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowTableFunction;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/** Utility class for ExecNodeMetadata related functionality. */
+public final class ExecNodeMetadataUtil {
+
+    private ExecNodeMetadataUtil() {
+        // no instantiation
+    }
+
+    private static final Map<ExecNodeNameVersion, Class<? extends ExecNode<?>>> lookupMap =
+            new HashMap<>();
+
+    private static final Set<Class<? extends ExecNode<?>>> execNodes = new HashSet<>();
+
+    static {
+        execNodes.add(StreamExecCalc.class);
+        execNodes.add(StreamExecChangelogNormalize.class);
+        execNodes.add(StreamExecCorrelate.class);
+        execNodes.add(StreamExecDeduplicate.class);
+        execNodes.add(StreamExecDropUpdateBefore.class);
+        execNodes.add(StreamExecExchange.class);
+        execNodes.add(StreamExecExpand.class);
+        execNodes.add(StreamExecGlobalGroupAggregate.class);
+        execNodes.add(StreamExecGlobalWindowAggregate.class);
+        execNodes.add(StreamExecGroupAggregate.class);
+        execNodes.add(StreamExecGroupWindowAggregate.class);
+        execNodes.add(StreamExecIncrementalGroupAggregate.class);
+        execNodes.add(StreamExecIntervalJoin.class);
+        execNodes.add(StreamExecJoin.class);
+        execNodes.add(StreamExecLimit.class);
+        execNodes.add(StreamExecLocalGroupAggregate.class);
+        execNodes.add(StreamExecLocalWindowAggregate.class);
+        execNodes.add(StreamExecLookupJoin.class);
+        execNodes.add(StreamExecMatch.class);
+        execNodes.add(StreamExecMiniBatchAssigner.class);
+        execNodes.add(StreamExecOverAggregate.class);
+        execNodes.add(StreamExecPythonCalc.class);
+        execNodes.add(StreamExecPythonCorrelate.class);
+        execNodes.add(StreamExecPythonGroupAggregate.class);
+        execNodes.add(StreamExecPythonGroupWindowAggregate.class);
+        execNodes.add(StreamExecPythonOverAggregate.class);
+        execNodes.add(StreamExecRank.class);
+        execNodes.add(StreamExecSink.class);
+        execNodes.add(StreamExecSortLimit.class);
+        execNodes.add(StreamExecTableSourceScan.class);
+        execNodes.add(StreamExecTemporalJoin.class);
+        execNodes.add(StreamExecTemporalSort.class);
+        execNodes.add(StreamExecUnion.class);
+        execNodes.add(StreamExecValues.class);
+        execNodes.add(StreamExecWatermarkAssigner.class);
+        execNodes.add(StreamExecWindowAggregate.class);
+        execNodes.add(StreamExecWindowDeduplicate.class);
+        execNodes.add(StreamExecWindowJoin.class);
+        execNodes.add(StreamExecWindowRank.class);
+        execNodes.add(StreamExecWindowTableFunction.class);
+    }
+
+    static {
+        for (Class<? extends ExecNode<?>> execNodeClass : execNodes) {
+            addToLookupMap(execNodeClass);
+        }
+    }
+
+    public static Set<Class<? extends ExecNode<?>>> execNodes() {
+        return execNodes;
+    }
+
+    public static Class<? extends ExecNode<?>> retrieveExecNode(String name, int version) {
+        return lookupMap.get(new ExecNodeNameVersion(name, version));
+    }
+
+    @VisibleForTesting
+    static void addTestNode(Class<? extends ExecNode<?>> execNodeClass) {
+        addToLookupMap(execNodeClass);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static List<ExecNodeMetadata> extractMetadataFromAnnotation(
+            Class<? extends ExecNode> execNodeClass) {
+        List<ExecNodeMetadata> metadata = new ArrayList<>();
+        ExecNodeMetadata annotation = execNodeClass.getDeclaredAnnotation(ExecNodeMetadata.class);
+        if (annotation != null) {
+            metadata.add(annotation);
+        }
+
+        ExecNodeMetadatas annotations =
+                execNodeClass.getDeclaredAnnotation(ExecNodeMetadatas.class);
+        if (metadata.isEmpty()) {
+            if (annotations != null) {
+                for (ExecNodeMetadata annot : annotations.value()) {
+                    if (annot != null) {
+                        metadata.add(annot);
+                    }
+                }
+            }
+        } else {
+            if (annotations != null) {
+                throw new IllegalStateException(
+                        String.format(
+                                "ExecNode: %s is annotated both with %s and %s. This is a bug, please contact developers.",
+                                execNodeClass.getCanonicalName(),
+                                ExecNodeMetadata.class,
+                                ExecNodeMetadatas.class));
+            }
+        }
+        return metadata;
+    }
+
+    private static void addToLookupMap(Class<? extends ExecNode<?>> execNodeClass) {
+        if (!JsonSerdeUtil.hasJsonCreatorAnnotation(execNodeClass)) {
+            throw new IllegalStateException(
+                    String.format(
+                            "ExecNode: %s does not implement @JsonCreator annotation on constructor. This is a bug, please contact developers.",

Review comment:
       I also don't think that this will be seen by users. But will be an exception for implementers. We don't need to add `This is a bug, please contact developers.` to every exception.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
+
+/**
+ * Helper Pojo that holds the necessary identifier fields that are used for JSON plan serialisation

Review comment:
       don't use the word `Pojo` in JavaDocs. Every class is kind of a POJO, it is rather informal use during discussions.

##########
File path: flink-table/flink-table-planner/src/test/resources/jsonplan/testGetJsonPlan.out
##########
@@ -2,7 +2,7 @@
    "flinkVersion":"",
    "nodes":[
       {
-         "class":"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan",
+         "context": "1_stream-exec-table-source-scan_1",

Review comment:
       call it `id` independent of our discussion outcome

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
+
+/**
+ * Helper Pojo that holds the necessary identifier fields that are used for JSON plan serialisation

Review comment:
       Add more JavaDocs how it is instantiated and why it is important.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
##########
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadatas;
+import org.apache.flink.table.planner.plan.nodes.exec.serde.JsonSerdeUtil;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecChangelogNormalize;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCorrelate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDropUpdateBefore;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExpand;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIncrementalGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIntervalJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLimit;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMatch;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMiniBatchAssigner;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCalc;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonOverAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSortLimit;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalSort;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecValues;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowDeduplicate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowRank;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowTableFunction;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/** Utility class for ExecNodeMetadata related functionality. */
+public final class ExecNodeMetadataUtil {
+
+    private ExecNodeMetadataUtil() {
+        // no instantiation
+    }
+
+    private static final Map<ExecNodeNameVersion, Class<? extends ExecNode<?>>> lookupMap =
+            new HashMap<>();
+
+    private static final Set<Class<? extends ExecNode<?>>> execNodes = new HashSet<>();
+
+    static {
+        execNodes.add(StreamExecCalc.class);
+        execNodes.add(StreamExecChangelogNormalize.class);
+        execNodes.add(StreamExecCorrelate.class);
+        execNodes.add(StreamExecDeduplicate.class);
+        execNodes.add(StreamExecDropUpdateBefore.class);
+        execNodes.add(StreamExecExchange.class);
+        execNodes.add(StreamExecExpand.class);
+        execNodes.add(StreamExecGlobalGroupAggregate.class);
+        execNodes.add(StreamExecGlobalWindowAggregate.class);
+        execNodes.add(StreamExecGroupAggregate.class);
+        execNodes.add(StreamExecGroupWindowAggregate.class);
+        execNodes.add(StreamExecIncrementalGroupAggregate.class);
+        execNodes.add(StreamExecIntervalJoin.class);
+        execNodes.add(StreamExecJoin.class);
+        execNodes.add(StreamExecLimit.class);
+        execNodes.add(StreamExecLocalGroupAggregate.class);
+        execNodes.add(StreamExecLocalWindowAggregate.class);
+        execNodes.add(StreamExecLookupJoin.class);
+        execNodes.add(StreamExecMatch.class);
+        execNodes.add(StreamExecMiniBatchAssigner.class);
+        execNodes.add(StreamExecOverAggregate.class);
+        execNodes.add(StreamExecPythonCalc.class);
+        execNodes.add(StreamExecPythonCorrelate.class);
+        execNodes.add(StreamExecPythonGroupAggregate.class);
+        execNodes.add(StreamExecPythonGroupWindowAggregate.class);
+        execNodes.add(StreamExecPythonOverAggregate.class);
+        execNodes.add(StreamExecRank.class);
+        execNodes.add(StreamExecSink.class);
+        execNodes.add(StreamExecSortLimit.class);
+        execNodes.add(StreamExecTableSourceScan.class);
+        execNodes.add(StreamExecTemporalJoin.class);
+        execNodes.add(StreamExecTemporalSort.class);
+        execNodes.add(StreamExecUnion.class);
+        execNodes.add(StreamExecValues.class);
+        execNodes.add(StreamExecWatermarkAssigner.class);
+        execNodes.add(StreamExecWindowAggregate.class);
+        execNodes.add(StreamExecWindowDeduplicate.class);
+        execNodes.add(StreamExecWindowJoin.class);
+        execNodes.add(StreamExecWindowRank.class);
+        execNodes.add(StreamExecWindowTableFunction.class);
+    }
+
+    static {
+        for (Class<? extends ExecNode<?>> execNodeClass : execNodes) {
+            addToLookupMap(execNodeClass);
+        }
+    }
+
+    public static Set<Class<? extends ExecNode<?>>> execNodes() {
+        return execNodes;
+    }
+
+    public static Class<? extends ExecNode<?>> retrieveExecNode(String name, int version) {
+        return lookupMap.get(new ExecNodeNameVersion(name, version));
+    }
+
+    @VisibleForTesting
+    static void addTestNode(Class<? extends ExecNode<?>> execNodeClass) {
+        addToLookupMap(execNodeClass);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private static List<ExecNodeMetadata> extractMetadataFromAnnotation(
+            Class<? extends ExecNode> execNodeClass) {
+        List<ExecNodeMetadata> metadata = new ArrayList<>();
+        ExecNodeMetadata annotation = execNodeClass.getDeclaredAnnotation(ExecNodeMetadata.class);
+        if (annotation != null) {
+            metadata.add(annotation);
+        }
+
+        ExecNodeMetadatas annotations =
+                execNodeClass.getDeclaredAnnotation(ExecNodeMetadatas.class);
+        if (metadata.isEmpty()) {
+            if (annotations != null) {
+                for (ExecNodeMetadata annot : annotations.value()) {
+                    if (annot != null) {
+                        metadata.add(annot);
+                    }
+                }
+            }
+        } else {
+            if (annotations != null) {
+                throw new IllegalStateException(
+                        String.format(
+                                "ExecNode: %s is annotated both with %s and %s. This is a bug, please contact developers.",

Review comment:
       for the future, let's use `this is a bug, please file an issue.`. I don't wanna have mails in my private inbox :D 

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeMetadatas.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Helper annotation to enable multiple {@link ExecNodeMetadata} annotations on an {@link ExecNode}
+ * class.
+ */
+@PublicEvolving
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE, ElementType.METHOD})
+public @interface ExecNodeMetadatas {

Review comment:
       Every IDE will show a typo here. Rename to `MultipleExecNodeMetadata` or a similar name. Usually the JavaDoc is a good indicator for a name ;-)

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeMetadata.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to be used for {@link ExecNode}s to keep necessary metadata when
+ * serialising/deserialising them in a plan.
+ *
+ * <p>Each {@link ExecNode} needs to be annotated and provide the necessary metadata info so that it
+ * can be correctly serialised and later on instantiated from a string (JSON) plan.
+ *
+ * <p>It's possible for one {@link ExecNode} class to user multiple annotations to denote ability to
+ * upgrade to more versions.
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@PublicEvolving
+public @interface ExecNodeMetadata {
+    // main information
+
+    /**
+     * Unique name of the {@link ExecNode} for serialization/deserialization and uid() generation.
+     * Together with version, uniquely identifies the {@link ExecNode} class.
+     */
+    String name();
+
+    /**
+     * A positive integer denoting the evolving version of an {@link ExecNode}, used for
+     * serialization/deserialization and uid() generation. Together with {@link #name()}, uniquely
+     * identifies the {@link ExecNode} class.
+     */
+    @JsonProperty("version")
+    int version();
+
+    // maintenance information for internal/community/test usage
+
+    /**
+     * Hard coded list of {@link ExecutionConfigOptions} keys of in the Flink version when the
+     * ExecNode was added. Does not reference instances in the {@link ExecutionConfigOptions} class
+     * in case those get refactored.
+     *
+     * <p>Completeness tests can verify that every option is set once in restore and change
+     * detection tests.
+     *
+     * <p>Completeness tests can verify that the ExecutionConfigOptions class still contains an
+     * option (via key or fallback key) for the given key.
+     *
+     * <p>Restore can verify whether the restored ExecNode config map contains only options of the
+     * given keys.
+     */
+    @JsonProperty("consumedOptions")
+    String[] consumedOptions() default {};
+
+    /**
+     * Set of operator names that can be part of the resulting Transformations.
+     *
+     * <p>Restore and completeness tests can verify there exists at least one test that adds each
+     * operator and that the created Transformations contain only operators with `uid`s containing
+     * the given operator names.
+     *
+     * <p>The concrete combinations or existence of these operators in the final pipeline depends on
+     * various parameters (both configuration and ExecNode-specific arguments such as interval size
+     * etc.).
+     */
+    @JsonProperty("producedOperators")

Review comment:
       We don't need to convert this into POJO into JSON?

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonCalc.java
##########
@@ -43,20 +42,20 @@ public BatchExecPythonCalc(
             RowType outputType,
             String description) {
         this(
+                ExecNodeContext.newMetadata(BatchExecPythonCalc.class),
                 projection,
-                getNewNodeId(),
                 Collections.singletonList(inputProperty),
                 outputType,
                 description);
     }
 
     @JsonCreator
     public BatchExecPythonCalc(
+            @JsonProperty(FIELD_NAME_CONTEXT) ExecNodeContext context,

Review comment:
       why has a batch exec node already JSON? Is this really required for Python?

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeMetadata.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to be used for {@link ExecNode}s to keep necessary metadata when
+ * serialising/deserialising them in a plan.
+ *
+ * <p>Each {@link ExecNode} needs to be annotated and provide the necessary metadata info so that it
+ * can be correctly serialised and later on instantiated from a string (JSON) plan.
+ *
+ * <p>It's possible for one {@link ExecNode} class to user multiple annotations to denote ability to
+ * upgrade to more versions.
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@PublicEvolving
+public @interface ExecNodeMetadata {
+    // main information
+
+    /**
+     * Unique name of the {@link ExecNode} for serialization/deserialization and uid() generation.
+     * Together with version, uniquely identifies the {@link ExecNode} class.
+     */
+    String name();
+
+    /**
+     * A positive integer denoting the evolving version of an {@link ExecNode}, used for
+     * serialization/deserialization and uid() generation. Together with {@link #name()}, uniquely
+     * identifies the {@link ExecNode} class.
+     */
+    @JsonProperty("version")
+    int version();
+
+    // maintenance information for internal/community/test usage
+
+    /**
+     * Hard coded list of {@link ExecutionConfigOptions} keys of in the Flink version when the
+     * ExecNode was added. Does not reference instances in the {@link ExecutionConfigOptions} class
+     * in case those get refactored.
+     *
+     * <p>Completeness tests can verify that every option is set once in restore and change
+     * detection tests.
+     *
+     * <p>Completeness tests can verify that the ExecutionConfigOptions class still contains an

Review comment:
       `{@link`

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
##########
@@ -136,30 +142,30 @@ public StreamExecGroupWindowAggregate(
             RowType outputType,
             String description) {
         this(
+                ExecNodeContext.newMetadata(StreamExecGroupWindowAggregate.class),

Review comment:
       Btw this method is also a great location for checking whether the class has been added to list of classes in the `ExecNodeMetadataUtil` already. For batch nodes, this could be a the list of added but unsupported classes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org