You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/25 17:15:16 UTC

[GitHub] [flink] slinkydeveloper commented on a change in pull request #18479: [FLINK-25387] Introduce ExecNodeMetadata

slinkydeveloper commented on a change in pull request #18479:
URL: https://github.com/apache/flink/pull/18479#discussion_r791883246



##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
##########
@@ -21,8 +21,7 @@
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.planner.plan.logical.LogicalWindow;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.utils.ReflectionsUtil;

Review comment:
       Given you removed this, can you check if you can remove the reflections dependency once for all?

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeTypeIdResolver.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DatabindContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
+
+/**
+ * Helper class to implement the Jackson subtype serialisation/de-serialisation. Instead of using
+ * the class name use the {@link ExecNodeMetadata#name()} and {@link ExecNodeMetadata#version()} to
+ * perform a lookup in a static map residing in {@link ExecNodeMetadataUtil}.
+ */
+public class ExecNodeTypeIdResolver extends TypeIdResolverBase {
+
+    private JavaType superType;
+
+    @Override
+    public void init(JavaType baseType) {
+        superType = baseType;
+    }
+
+    @Override
+    public Id getMechanism() {
+        return Id.NAME;
+    }
+
+    @Override
+    public String idFromValue(Object obj) {
+        return idFromValueAndType(obj, obj.getClass());
+    }
+
+    @Override
+    public String idFromValueAndType(Object obj, Class<?> subType) {
+        return ((ExecNodeBase<?>) obj).getContextFromAnnotation().toString();
+    }
+
+    @Override
+    public JavaType typeFromId(DatabindContext context, String id) {
+        ExecNodeContext execNodeContext = new ExecNodeContext(id);
+        return context.constructSpecializedType(
+                superType,
+                ExecNodeMetadataUtil.retrieveExecNode(
+                        execNodeContext.getName(), execNodeContext.getVersion()));
+    }

Review comment:
       I wonder if you can cache the `constructSpecializedType` for each exec node in `init`. Perhaps you should be able to access the type factory from the super class? Or perhaps there is already a cache in the `DatabindContext`/`TypeFactory` for constructed `JavaType`s?
   
   The reason I ask is that, if i'm not mistaken, constructing `JavaType` is where the reflections magic happens, so it's quite expensive as it reads the class, all its fields, methods, annotations, it even does some `Unsafe` related stuff, so if we can trivially avoid to repeat it, it's definitely better.

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeContext.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonValue;
+
+/**
+ * Helper Pojo that holds the necessary identifier fields that are used for JSON plan serialisation
+ * and de-serialisation.
+ */
+public class ExecNodeContext {
+    /** The unique identifier for each ExecNode in the JSON plan. */

Review comment:
       Can you have these javadocs on the getters instead?

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecChangelogNormalize;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCorrelate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeduplicate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDropUpdateBefore;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExpand;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGlobalWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecGroupWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIncrementalGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIntervalJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLimit;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLocalWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMatch;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecMiniBatchAssigner;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecOverAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCalc;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonCorrelate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonOverAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecRank;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSortLimit;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTemporalSort;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecUnion;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecValues;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWatermarkAssigner;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowAggregate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowDeduplicate;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowJoin;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowRank;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecWindowTableFunction;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** Utility class for ExecNodeMetadata related functionality. */
+public final class ExecNodeMetadataUtil {
+
+    private ExecNodeMetadataUtil() {
+        // no instantiation
+    }
+
+    private static final Map<ExecNodeNameVersion, Class<? extends ExecNode<?>>> lookupMap =
+            new HashMap<>();
+
+    private static final List<Class<? extends ExecNode<?>>> execNodes = new ArrayList<>();
+
+    static {
+        execNodes.add(StreamExecCalc.class);
+        execNodes.add(StreamExecChangelogNormalize.class);
+        execNodes.add(StreamExecCorrelate.class);
+        execNodes.add(StreamExecDeduplicate.class);
+        execNodes.add(StreamExecDropUpdateBefore.class);
+        execNodes.add(StreamExecExchange.class);
+        execNodes.add(StreamExecExpand.class);
+        execNodes.add(StreamExecGlobalGroupAggregate.class);
+        execNodes.add(StreamExecGlobalWindowAggregate.class);
+        execNodes.add(StreamExecGroupAggregate.class);
+        execNodes.add(StreamExecGroupWindowAggregate.class);
+        execNodes.add(StreamExecIncrementalGroupAggregate.class);
+        execNodes.add(StreamExecIntervalJoin.class);
+        execNodes.add(StreamExecJoin.class);
+        execNodes.add(StreamExecLimit.class);
+        execNodes.add(StreamExecLocalGroupAggregate.class);
+        execNodes.add(StreamExecLocalWindowAggregate.class);
+        execNodes.add(StreamExecLookupJoin.class);
+        execNodes.add(StreamExecMatch.class);
+        execNodes.add(StreamExecMiniBatchAssigner.class);
+        execNodes.add(StreamExecOverAggregate.class);
+        execNodes.add(StreamExecPythonCalc.class);
+        execNodes.add(StreamExecPythonCorrelate.class);
+        execNodes.add(StreamExecPythonGroupAggregate.class);
+        execNodes.add(StreamExecPythonGroupWindowAggregate.class);
+        execNodes.add(StreamExecPythonOverAggregate.class);
+        execNodes.add(StreamExecRank.class);
+        execNodes.add(StreamExecSink.class);
+        execNodes.add(StreamExecSortLimit.class);
+        execNodes.add(StreamExecTableSourceScan.class);
+        execNodes.add(StreamExecTemporalJoin.class);
+        execNodes.add(StreamExecTemporalSort.class);
+        execNodes.add(StreamExecUnion.class);
+        execNodes.add(StreamExecValues.class);
+        execNodes.add(StreamExecWatermarkAssigner.class);
+        execNodes.add(StreamExecWindowAggregate.class);
+        execNodes.add(StreamExecWindowDeduplicate.class);
+        execNodes.add(StreamExecWindowJoin.class);
+        execNodes.add(StreamExecWindowRank.class);
+        execNodes.add(StreamExecWindowTableFunction.class);
+    }
+
+    static {
+        for (Class<? extends ExecNode<?>> execNode : execNodes) {
+            ExecNodeMetadata metadata = execNode.getAnnotation(ExecNodeMetadata.class);
+            if (metadata == null) {
+                throw new IllegalStateException(
+                        "ExecNode: "
+                                + execNode.getSimpleName()
+                                + " is missing "
+                                + ExecNodeMetadata.class.getSimpleName()
+                                + " annotation");

Review comment:
       Add another sentence here:
   
   ```
   This is a bug, please contact developers.
   ```
   
   
   

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeMetadata.java
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to be used for {@link ExecNode}s to keep necessary metadata when
+ * serialising/deserialising them in a plan.
+ *
+ * <p>Each {@link ExecNode} needs to be annotated and provide the necessary metadata info so that it
+ * can be correctly serialised and later on instantiated from a string (JSON) plan.
+ *
+ * <p>It's possible for one {@link ExecNode} class to user multiple annotations to denote ability to
+ * upgrade to more versions.
+ */
+@Documented
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@PublicEvolving
+public @interface ExecNodeMetadata {
+    // main information
+
+    /**
+     * Unique name of the {@link ExecNode} for serialization/deserialization and uid() generation.
+     * Together with version, uniquely identifies the {@link ExecNode} class.
+     */
+    String name();
+
+    /**
+     * A positive integer denoting the evolving version of an {@link ExecNode}, used for
+     * serialization/deserialization and uid() generation. Together with {@link #name()}, uniquely
+     * identifies the {@link ExecNode} class.
+     */
+    @JsonProperty("version")
+    int version();
+
+    // maintenance information for internal/community/test usage
+
+    /**
+     * Hard coded list of {@link ExecutionConfigOptions} keys of in the Flink version when the

Review comment:
       Replace `list` with `set`

##########
File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeTypeIdResolver.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec;
+
+import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DatabindContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
+
+/**
+ * Helper class to implement the Jackson subtype serialisation/de-serialisation. Instead of using
+ * the class name use the {@link ExecNodeMetadata#name()} and {@link ExecNodeMetadata#version()} to
+ * perform a lookup in a static map residing in {@link ExecNodeMetadataUtil}.
+ */
+public class ExecNodeTypeIdResolver extends TypeIdResolverBase {
+
+    private JavaType superType;
+
+    @Override
+    public void init(JavaType baseType) {
+        superType = baseType;
+    }
+
+    @Override
+    public Id getMechanism() {
+        return Id.NAME;
+    }
+
+    @Override
+    public String idFromValue(Object obj) {
+        return idFromValueAndType(obj, obj.getClass());
+    }
+
+    @Override
+    public String idFromValueAndType(Object obj, Class<?> subType) {
+        return ((ExecNodeBase<?>) obj).getContextFromAnnotation().toString();
+    }
+
+    @Override
+    public JavaType typeFromId(DatabindContext context, String id) {
+        ExecNodeContext execNodeContext = new ExecNodeContext(id);
+        return context.constructSpecializedType(
+                superType,
+                ExecNodeMetadataUtil.retrieveExecNode(
+                        execNodeContext.getName(), execNodeContext.getVersion()));
+    }

Review comment:
       Ok as I was expecting, there is already an LRU cache within `TypeFactory` that takes care of caching `JavaType`. So you can ignore my comment if you want.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org