You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/08/16 07:27:54 UTC

[flink] branch release-1.15 updated: [FLINK-28861][table] Make UID generation behavior configurable and plan-only by default

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 105d7c911bd [FLINK-28861][table] Make UID generation behavior configurable and plan-only by default
105d7c911bd is described below

commit 105d7c911bd0c5d8634417c22164547651abf07b
Author: Timo Walther <tw...@apache.org>
AuthorDate: Wed Aug 10 15:30:50 2022 +0200

    [FLINK-28861][table] Make UID generation behavior configurable and plan-only by default
    
    Before this commit, due to changes for FLIP-190, every operator generated by the planner
    got a UID assigned. However, the UID is based on a static counter that might return different
    results depending on the environment. Thus, UIDs are not deterministic and make stateful
    restores impossible e.g. when going from 1.15.0 -> 1.15.1. This PR restores the old pre-1.15
    behavior for regular Table API. It only adds UIDs if the operator has been created from a
    compiled plan. A compiled plan makes the UIDs static and thus deterministic.
    
    table.exec.uid.generation=ALWAYS exists for backwards compatibility and could make stateful
    upgrades possible even with invalid UIDs on best effort basis.
    
    This closes #20586.
---
 .../generated/execution_config_configuration.html  |  6 ++
 .../connector/file/table/stream/StreamingSink.java | 59 +++++++------
 .../connectors/hive/HiveTableSourceITCase.java     |  6 +-
 .../flink/table/connector/ProviderContext.java     |  4 +-
 .../table/api/config/ExecutionConfigOptions.java   | 80 ++++++++++++++++--
 .../table/planner/plan/nodes/exec/ExecNode.java    |  6 ++
 .../planner/plan/nodes/exec/ExecNodeBase.java      | 43 ++++++----
 .../planner/plan/nodes/exec/ExecNodeConfig.java    | 37 +++++++-
 .../plan/nodes/exec/ExecNodeGraphGenerator.java    | 10 +--
 .../plan/nodes/exec/ExecNodeTranslator.java        |  2 +-
 .../plan/nodes/exec/common/CommonExecSink.java     | 18 ++--
 .../exec/common/CommonExecTableSourceScan.java     |  6 +-
 .../plan/nodes/exec/serde/JsonSerdeUtil.java       |  8 +-
 .../nodes/exec/stream/StreamExecIntervalJoin.java  |  4 +-
 .../plan/nodes/exec/stream/StreamExecMatch.java    |  2 +-
 .../table/planner/delegation/PlannerBase.scala     | 12 +--
 .../table/planner/delegation/StreamPlanner.scala   |  2 +-
 .../plan/nodes/physical/FlinkPhysicalRel.scala     | 16 ++++
 .../table/api/internal/CompiledPlanUtils.java      |  4 +-
 .../plan/nodes/exec/TestingBatchExecNode.java      |  5 ++
 .../plan/nodes/exec/TransformationsTest.java       | 98 ++++++++++++++++------
 .../MultipleInputNodeCreationProcessorTest.java    |  3 +-
 .../planner/utils/InternalConfigOptionsTest.java   |  3 +-
 .../flink/table/planner/utils/TableTestBase.scala  |  4 +-
 24 files changed, 326 insertions(+), 112 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/execution_config_configuration.html b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index 364d9bf9d81..7ed185699e6 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -154,6 +154,12 @@ By default no operator is disabled.</td>
             <td>Duration</td>
             <td>Specifies a minimum time interval for how long idle state (i.e. state which was not updated), will be retained. State will never be cleared until it was idle for less than the minimum time, and will be cleared at some time after it was idle. Default is never clean-up the state. NOTE: Cleaning up state requires additional overhead for bookkeeping. Default value is 0, which means that it will never clean up state.</td>
         </tr>
+        <tr>
+            <td><h5>table.exec.uid.generation</h5><br> <span class="label label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">PLAN_ONLY</td>
+            <td><p>Enum</p></td>
+            <td>This configuration option was introduced in 1.15.2 to fix the incorrect behavior of 1.15.0 and 1.15.1. Use PLAN_ONLY for new pipelines and ALWAYS in case of stateful patch version upgrades.<br />In order to remap state to operators during a restore, it is required that the pipeline's streaming transformations get a UID assigned.<br />The planner can generate and assign explicit UIDs. If no UIDs have been set by the planner, the UIDs will be auto-generated by lower layers  [...]
+        </tr>
         <tr>
             <td><h5>table.exec.window-agg.buffer-size-limit</h5><br> <span class="label label-primary">Batch</span></td>
             <td style="word-wrap: break-word;">100000</td>
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingSink.java
index 140f1596c8f..ee86f08654d 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingSink.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/StreamingSink.java
@@ -71,13 +71,15 @@ public class StreamingSink {
             Configuration conf) {
         StreamingFileWriter<T> fileWriter =
                 new StreamingFileWriter<>(bucketCheckInterval, bucketsBuilder, partitionKeys, conf);
-        return inputStream
-                .transform(
-                        StreamingFileWriter.class.getSimpleName(),
-                        TypeInformation.of(PartitionCommitInfo.class),
-                        fileWriter)
-                .uid(providerContext.generateUid("streaming-writer").get())
-                .setParallelism(parallelism);
+        SingleOutputStreamOperator<PartitionCommitInfo> writerStream =
+                inputStream
+                        .transform(
+                                StreamingFileWriter.class.getSimpleName(),
+                                TypeInformation.of(PartitionCommitInfo.class),
+                                fileWriter)
+                        .setParallelism(parallelism);
+        providerContext.generateUid("streaming-writer").ifPresent(writerStream::uid);
+        return writerStream;
     }
 
     /**
@@ -104,21 +106,24 @@ public class StreamingSink {
 
         CompactCoordinator coordinator = new CompactCoordinator(fsSupplier, targetFileSize);
 
-        SingleOutputStreamOperator<CoordinatorOutput> coordinatorOp =
+        SingleOutputStreamOperator<CoordinatorInput> writerStream =
                 inputStream
                         .transform(
                                 "streaming-writer",
                                 TypeInformation.of(CoordinatorInput.class),
                                 writer)
-                        .uid(providerContext.generateUid("streaming-writer").get())
-                        .setParallelism(parallelism)
+                        .setParallelism(parallelism);
+        providerContext.generateUid("streaming-writer").ifPresent(writerStream::uid);
+
+        SingleOutputStreamOperator<CoordinatorOutput> coordinatorStream =
+                writerStream
                         .transform(
                                 "compact-coordinator",
                                 TypeInformation.of(CoordinatorOutput.class),
                                 coordinator)
-                        .uid(providerContext.generateUid("compact-coordinator").get())
                         .setParallelism(1)
                         .setMaxParallelism(1);
+        providerContext.generateUid("compact-coordinator").ifPresent(coordinatorStream::uid);
 
         CompactWriter.Factory<T> writerFactory =
                 CompactBucketWriter.factory(
@@ -128,14 +133,17 @@ public class StreamingSink {
         CompactOperator<T> compacter =
                 new CompactOperator<>(fsSupplier, readFactory, writerFactory);
 
-        return coordinatorOp
-                .broadcast()
-                .transform(
-                        "compact-operator",
-                        TypeInformation.of(PartitionCommitInfo.class),
-                        compacter)
-                .uid(providerContext.generateUid("compact-operator").get())
-                .setParallelism(parallelism);
+        SingleOutputStreamOperator<PartitionCommitInfo> operatorStream =
+                coordinatorStream
+                        .broadcast()
+                        .transform(
+                                "compact-operator",
+                                TypeInformation.of(PartitionCommitInfo.class),
+                                compacter)
+                        .setParallelism(parallelism);
+        providerContext.generateUid("compact-operator").ifPresent(operatorStream::uid);
+
+        return operatorStream;
     }
 
     /**
@@ -156,17 +164,18 @@ public class StreamingSink {
             PartitionCommitter committer =
                     new PartitionCommitter(
                             locationPath, identifier, partitionKeys, msFactory, fsFactory, options);
-            stream =
+            SingleOutputStreamOperator<Void> committerStream =
                     writer.transform(
                                     PartitionCommitter.class.getSimpleName(), Types.VOID, committer)
-                            .uid(providerContext.generateUid("partition-committer").get())
                             .setParallelism(1)
                             .setMaxParallelism(1);
+            providerContext.generateUid("partition-committer").ifPresent(committerStream::uid);
+            stream = committerStream;
         }
 
-        return stream.addSink(new DiscardingSink<>())
-                .uid(providerContext.generateUid("discarding-sink").get())
-                .name("end")
-                .setParallelism(1);
+        DataStreamSink<?> discardingSink =
+                stream.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
+        providerContext.generateUid("discarding-sink").ifPresent(discardingSink::uid);
+        return discardingSink;
     }
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index df3023fdb74..e1d7c499924 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -482,7 +482,7 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase {
         PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) tEnv).getPlanner();
         RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
         ExecNode<?> execNode =
-                planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)))
+                planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)), false)
                         .getRootNodes()
                         .get(0);
         Transformation<?> transformation = execNode.translateToPlan(planner);
@@ -513,7 +513,7 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase {
         PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) tEnv).getPlanner();
         RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
         ExecNode<?> execNode =
-                planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)))
+                planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)), false)
                         .getRootNodes()
                         .get(0);
         Transformation<?> transformation =
@@ -547,7 +547,7 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase {
         PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) tEnv).getPlanner();
         RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
         ExecNode<?> execNode =
-                planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)))
+                planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)), false)
                         .getRootNodes()
                         .get(0);
         Transformation<?> transformation =
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/ProviderContext.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/ProviderContext.java
index 46c9cfa206c..c36aa683aad 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/ProviderContext.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/ProviderContext.java
@@ -38,8 +38,8 @@ public interface ProviderContext {
      * The {@code name} must be unique within the provider implementation. The framework will make
      * sure that the name is unique for the entire topology.
      *
-     * <p>This method returns empty if an identifier cannot be generated, i.e. because the job is in
-     * batch mode.
+     * <p>This method returns empty if an identifier cannot be generated, i.e., because the job is
+     * in batch mode, or UIDs cannot be guaranteed to be unique.
      */
     Optional<String> generateUid(String name);
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 9aa8642dee4..76c681eb42f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -478,6 +478,7 @@ public class ExecutionConfigOptions {
                                             + "all changes to downstream just like when the mini-batch is "
                                             + "not enabled.");
 
+    /** @deprecated Use {@link #TABLE_EXEC_UID_GENERATION} instead. */
     @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
     @Deprecated
     public static final ConfigOption<Boolean> TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS =
@@ -485,11 +486,43 @@ public class ExecutionConfigOptions {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
-                            "In Flink 1.15 Transformation UIDs are generated deterministically starting from the metadata available after the planning phase. "
-                                    + "This new behaviour allows a safe restore of persisted plan, remapping the plan execution graph to the correct operators state. "
-                                    + "Setting this flag to true enables the previous \"legacy\" behavior, which is generating uids from the Transformation graph topology. "
-                                    + "We strongly suggest to keep this flag disabled, as this flag is going to be removed in the next releases. "
-                                    + "If you have a pipeline relying on the old behavior, please create a new pipeline and regenerate the operators state.");
+                            "This flag has been replaced by table.exec.uid.generation. Use the enum "
+                                    + "value DISABLED to restore legacy behavior. However, the new "
+                                    + "default value should be sufficient for most use cases as "
+                                    + "only pipelines from compiled plans get UIDs assigned.");
+
+    @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+    public static final ConfigOption<UidGeneration> TABLE_EXEC_UID_GENERATION =
+            key("table.exec.uid.generation")
+                    .enumType(UidGeneration.class)
+                    .defaultValue(UidGeneration.PLAN_ONLY)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "This configuration option was introduced in 1.15.2 to fix the incorrect "
+                                                    + "behavior of 1.15.0 and 1.15.1. Use PLAN_ONLY for new pipelines and "
+                                                    + "ALWAYS in case of stateful patch version upgrades.")
+                                    .linebreak()
+                                    .text(
+                                            "In order to remap state to operators during a restore, "
+                                                    + "it is required that the pipeline's streaming "
+                                                    + "transformations get a UID assigned.")
+                                    .linebreak()
+                                    .text(
+                                            "The planner can generate and assign explicit UIDs. If no "
+                                                    + "UIDs have been set by the planner, the UIDs will "
+                                                    + "be auto-generated by lower layers that can take "
+                                                    + "the complete topology into account for uniqueness "
+                                                    + "of the IDs. See the DataStream API for more information.")
+                                    .linebreak()
+                                    .text(
+                                            "This configuration option is for experts only and the default "
+                                                    + "should be sufficient for most use cases. By default, "
+                                                    + "only pipelines created from a persisted compiled plan will "
+                                                    + "get UIDs assigned explicitly. Thus, these pipelines can "
+                                                    + "be arbitrarily moved around within the same topology without "
+                                                    + "affecting the stable UIDs.")
+                                    .build());
 
     // ------------------------------------------------------------------------------------------
     // Enum option types
@@ -597,4 +630,41 @@ public class ExecutionConfigOptions {
             return enabled;
         }
     }
+
+    /**
+     * Strategy for generating transformation UIDs for remapping state to operators during restore.
+     */
+    @PublicEvolving
+    public enum UidGeneration implements DescribedEnum {
+        PLAN_ONLY(
+                text(
+                        "Recommended for new 1.15.2+ pipelines. "
+                                + "Sets UIDs on streaming transformations if and only if the pipeline definition "
+                                + "comes from a compiled plan. Pipelines that have been constructed in "
+                                + "the API without a compilation step will not set an explicit UID as "
+                                + "it might not be stable across multiple translations.")),
+        ALWAYS(
+                text(
+                        "Default in 1.15.0 and 1.15.1, recommended for existing pipelines of these versions. "
+                                + "Always sets UIDs on streaming transformations. This strategy is for experts only! "
+                                + "Pipelines that have been constructed in the API without a compilation "
+                                + "step might not be able to be restored properly. The UID generation "
+                                + "depends on previously declared pipelines (potentially across jobs "
+                                + "if the same JVM is used). Thus, a stable environment must be ensured. "
+                                + "Pipeline definitions that come from a compiled plan are safe to use.")),
+
+        DISABLED(text("No explicit UIDs will be set."));
+
+        private final InlineElement description;
+
+        UidGeneration(InlineElement description) {
+            this.description = description;
+        }
+
+        @Internal
+        @Override
+        public InlineElement getDescription() {
+            return description;
+        }
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
index d2b06fd5342..dc18eb58a5e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNode.java
@@ -116,4 +116,10 @@ public interface ExecNode<T> extends ExecNodeTranslator<T> {
      * @param visitor ExecNodeVisitor.
      */
     void accept(ExecNodeVisitor visitor);
+
+    /**
+     * Declares whether the node has been created as part of a plan compilation. Some translation
+     * properties might be impacted by this (e.g. UID generation for transformations).
+     */
+    void setCompiled(boolean isCompiled);
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
index 06b88507b3f..21b0f9dd8e1 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java
@@ -30,6 +30,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.visitor.ExecNodeVisitor;
 import org.apache.flink.table.planner.plan.utils.ExecNodeMetadataUtil;
 import org.apache.flink.table.types.logical.LogicalType;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JacksonInject;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
@@ -49,6 +50,15 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public abstract class ExecNodeBase<T> implements ExecNode<T> {
 
+    /**
+     * The default value of this flag is false. Other cases must set this flag accordingly via
+     * {@link #setCompiled(boolean)}. It is not exposed via a constructor arg to avoid complex
+     * constructor overloading for all {@link ExecNode}s. However, during deserialization this flag
+     * will always be set to true.
+     */
+    @JacksonInject("isDeserialize")
+    private boolean isCompiled;
+
     private final String description;
 
     private final LogicalType outputType;
@@ -147,8 +157,10 @@ public abstract class ExecNodeBase<T> implements ExecNode<T> {
             transformation =
                     translateToPlanInternal(
                             (PlannerBase) planner,
-                            new ExecNodeConfig(
-                                    ((PlannerBase) planner).getTableConfig(), persistedConfig));
+                            ExecNodeConfig.of(
+                                    ((PlannerBase) planner).getTableConfig(),
+                                    persistedConfig,
+                                    isCompiled));
             if (this instanceof SingleTransformationTranslator) {
                 if (inputsContainSingleton()) {
                     transformation.setParallelism(1);
@@ -159,6 +171,16 @@ public abstract class ExecNodeBase<T> implements ExecNode<T> {
         return transformation;
     }
 
+    @Override
+    public void accept(ExecNodeVisitor visitor) {
+        visitor.visit(this);
+    }
+
+    @Override
+    public void setCompiled(boolean compiled) {
+        isCompiled = compiled;
+    }
+
     /**
      * Internal method, translates this node into a Flink operator.
      *
@@ -171,11 +193,6 @@ public abstract class ExecNodeBase<T> implements ExecNode<T> {
     protected abstract Transformation<T> translateToPlanInternal(
             PlannerBase planner, ExecNodeConfig config);
 
-    @Override
-    public void accept(ExecNodeVisitor visitor) {
-        visitor.visit(this);
-    }
-
     /** Whether singleton distribution is required. */
     protected boolean inputsContainSingleton() {
         return getInputProperties().stream()
@@ -203,13 +220,11 @@ public abstract class ExecNodeBase<T> implements ExecNode<T> {
     }
 
     protected TransformationMetadata createTransformationMeta(
-            String operatorName, ReadableConfig config) {
-        if (ExecNodeMetadataUtil.isUnsupported(this.getClass())
-                || config.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS)) {
+            String operatorName, ExecNodeConfig config) {
+        if (ExecNodeMetadataUtil.isUnsupported(this.getClass()) || !config.shouldSetUid()) {
             return new TransformationMetadata(
                     createTransformationName(config), createTransformationDescription(config));
         } else {
-            // Only classes supporting metadata util need to set the uid
             return new TransformationMetadata(
                     createTransformationUid(operatorName),
                     createTransformationName(config),
@@ -218,14 +233,12 @@ public abstract class ExecNodeBase<T> implements ExecNode<T> {
     }
 
     protected TransformationMetadata createTransformationMeta(
-            String operatorName, String detailName, String simplifiedName, ReadableConfig config) {
+            String operatorName, String detailName, String simplifiedName, ExecNodeConfig config) {
         final String name = createFormattedTransformationName(detailName, simplifiedName, config);
         final String desc = createFormattedTransformationDescription(detailName, config);
-        if (ExecNodeMetadataUtil.isUnsupported(this.getClass())
-                || config.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS)) {
+        if (ExecNodeMetadataUtil.isUnsupported(this.getClass()) || !config.shouldSetUid()) {
             return new TransformationMetadata(name, desc);
         } else {
-            // Only classes supporting metadata util need to set the uid
             return new TransformationMetadata(createTransformationUid(operatorName), name, desc);
         }
     }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
index ebfe752141b..7daf8a27f7c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeConfig.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.ExecutionConfigOptions.UidGeneration;
 import org.apache.flink.table.planner.delegation.PlannerBase;
 
 import java.util.Optional;
@@ -32,6 +33,10 @@ import java.util.Optional;
  * ExecNodeBase#getPersistedConfig()} configuration. The persisted configuration of the {@link
  * ExecNode} which is deserialized from the JSON plan has precedence over the {@link
  * PlannerBase#getTableConfig()}.
+ *
+ * <p>This class is intended to contain additional context information for {@link ExecNode}
+ * translation such as {@link #shouldSetUid()} or helper methods for accessing configuration such as
+ * {@link #getStateRetentionTime()}.
  */
 @Internal
 public final class ExecNodeConfig implements ReadableConfig {
@@ -40,9 +45,17 @@ public final class ExecNodeConfig implements ReadableConfig {
 
     private final ReadableConfig nodeConfig;
 
-    ExecNodeConfig(TableConfig tableConfig, ReadableConfig nodeConfig) {
+    private final boolean isCompiled;
+
+    private ExecNodeConfig(TableConfig tableConfig, ReadableConfig nodeConfig, boolean isCompiled) {
         this.nodeConfig = nodeConfig;
         this.tableConfig = tableConfig;
+        this.isCompiled = isCompiled;
+    }
+
+    static ExecNodeConfig of(
+            TableConfig tableConfig, ReadableConfig nodeConfig, boolean isCompiled) {
+        return new ExecNodeConfig(tableConfig, nodeConfig, isCompiled);
     }
 
     /**
@@ -73,4 +86,26 @@ public final class ExecNodeConfig implements ReadableConfig {
     public long getStateRetentionTime() {
         return get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis();
     }
+
+    /** @return Whether the {@link ExecNode} translation happens as part of a plan compilation. */
+    public boolean isCompiled() {
+        return isCompiled;
+    }
+
+    /** @return Whether transformations should set a UID. */
+    public boolean shouldSetUid() {
+        final UidGeneration uidGeneration = get(ExecutionConfigOptions.TABLE_EXEC_UID_GENERATION);
+        switch (uidGeneration) {
+            case PLAN_ONLY:
+                return isCompiled
+                        && !get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS);
+            case ALWAYS:
+                return true;
+            case DISABLED:
+                return false;
+            default:
+                throw new IllegalArgumentException(
+                        "Unknown UID generation strategy: " + uidGeneration);
+        }
+    }
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java
index 02213bd4440..beadfc4f125 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeGraphGenerator.java
@@ -48,15 +48,15 @@ public class ExecNodeGraphGenerator {
         this.visitedRels = new IdentityHashMap<>();
     }
 
-    public ExecNodeGraph generate(List<FlinkPhysicalRel> relNodes) {
+    public ExecNodeGraph generate(List<FlinkPhysicalRel> relNodes, boolean isCompiled) {
         List<ExecNode<?>> rootNodes = new ArrayList<>(relNodes.size());
         for (FlinkPhysicalRel relNode : relNodes) {
-            rootNodes.add(generate(relNode));
+            rootNodes.add(generate(relNode, isCompiled));
         }
         return new ExecNodeGraph(rootNodes);
     }
 
-    private ExecNode<?> generate(FlinkPhysicalRel rel) {
+    private ExecNode<?> generate(FlinkPhysicalRel rel, boolean isCompiled) {
         ExecNode<?> execNode = visitedRels.get(rel);
         if (execNode != null) {
             return execNode;
@@ -68,10 +68,10 @@ public class ExecNodeGraphGenerator {
 
         List<ExecNode<?>> inputNodes = new ArrayList<>();
         for (RelNode input : rel.getInputs()) {
-            inputNodes.add(generate((FlinkPhysicalRel) input));
+            inputNodes.add(generate((FlinkPhysicalRel) input, isCompiled));
         }
 
-        execNode = rel.translateToExecNode();
+        execNode = rel.translateToExecNode(isCompiled);
         // connects the input nodes
         List<ExecEdge> inputEdges = new ArrayList<>(inputNodes.size());
         for (ExecNode<?> inputNode : inputNodes) {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeTranslator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeTranslator.java
index ccf4a4fad6d..74899c5b50e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeTranslator.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeTranslator.java
@@ -36,7 +36,7 @@ public interface ExecNodeTranslator<T> {
      *
      * <p>NOTE: This method should return same translate result if called multiple times.
      *
-     * @param planner The {@link Planner} of the translated Table.
+     * @param planner The {@link Planner} of the translated graph.
      */
     Transformation<T> translateToPlan(Planner planner);
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 00cf4daea73..8082a15a313 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -52,6 +52,7 @@ import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
 import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
@@ -135,7 +136,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
     @SuppressWarnings("unchecked")
     protected Transformation<Object> createSinkTransformation(
             StreamExecutionEnvironment streamExecEnv,
-            ReadableConfig config,
+            ExecNodeConfig config,
             Transformation<RowData> inputTransform,
             DynamicTableSink tableSink,
             int rowtimeFieldIndex,
@@ -203,7 +204,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
      */
     private Transformation<RowData> applyConstraintValidations(
             Transformation<RowData> inputTransform,
-            ReadableConfig config,
+            ExecNodeConfig config,
             RowType physicalRowType) {
         final ConstraintEnforcer.Builder validatorBuilder = ConstraintEnforcer.newBuilder();
         final String[] fieldNames = physicalRowType.getFieldNames().toArray(new String[0]);
@@ -348,7 +349,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
      * messages.
      */
     private Transformation<RowData> applyKeyBy(
-            ReadableConfig config,
+            ExecNodeConfig config,
             Transformation<RowData> inputTransform,
             int[] primaryKeys,
             int sinkParallelism,
@@ -391,7 +392,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
             Transformation<RowData> inputTransform,
             int[] primaryKeys,
             int sinkParallelism,
-            ReadableConfig config,
+            ExecNodeConfig config,
             RowType physicalRowType) {
         GeneratedRecordEqualiser equaliser =
                 new EqualiserCodeGenerator(physicalRowType)
@@ -435,7 +436,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
             SinkRuntimeProvider runtimeProvider,
             int rowtimeFieldIndex,
             int sinkParallelism,
-            ReadableConfig config) {
+            ExecNodeConfig config) {
         TransformationMetadata sinkMeta = createTransformationMeta(SINK_TRANSFORMATION, config);
         if (runtimeProvider instanceof DataStreamSinkProvider) {
             Transformation<RowData> sinkTransformation =
@@ -519,10 +520,9 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
         }
     }
 
-    private ProviderContext createProviderContext(ReadableConfig config) {
+    private ProviderContext createProviderContext(ExecNodeConfig config) {
         return name -> {
-            if (this instanceof StreamExecNode
-                    && !config.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS)) {
+            if (this instanceof StreamExecNode && config.shouldSetUid()) {
                 return Optional.of(createTransformationUid(name));
             }
             return Optional.empty();
@@ -557,7 +557,7 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
             Transformation<RowData> inputTransform,
             int rowtimeFieldIndex,
             int sinkParallelism,
-            ReadableConfig config) {
+            ExecNodeConfig config) {
         // Don't apply the transformation/operator if there is no rowtimeFieldIndex
         if (rowtimeFieldIndex == -1) {
             return inputTransform;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
index 07e193b81e5..79efe0dc81d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java
@@ -30,7 +30,6 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.connector.ProviderContext;
 import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.connector.source.InputFormatProvider;
@@ -152,10 +151,9 @@ public abstract class CommonExecTableSourceScan extends ExecNodeBase<RowData>
         }
     }
 
-    private ProviderContext createProviderContext(ReadableConfig config) {
+    private ProviderContext createProviderContext(ExecNodeConfig config) {
         return name -> {
-            if (this instanceof StreamExecNode
-                    && !config.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS)) {
+            if (this instanceof StreamExecNode && config.shouldSetUid()) {
                 return Optional.of(createTransformationUid(name));
             }
             return Optional.empty();
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
index 8a601601d3f..ee43d1b6bce 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/JsonSerdeUtil.java
@@ -47,6 +47,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.ObjectCodec;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.TreeNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.InjectableValues;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.MapperFeature;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.Module;
@@ -117,7 +118,8 @@ public class JsonSerdeUtil {
     public static ObjectReader createObjectReader(SerdeContext serdeContext) {
         return OBJECT_MAPPER_INSTANCE
                 .reader()
-                .withAttribute(SerdeContext.SERDE_CONTEXT_KEY, serdeContext);
+                .withAttribute(SerdeContext.SERDE_CONTEXT_KEY, serdeContext)
+                .with(defaultInjectedValues());
     }
 
     public static ObjectWriter createObjectWriter(SerdeContext serdeContext) {
@@ -126,6 +128,10 @@ public class JsonSerdeUtil {
                 .withAttribute(SerdeContext.SERDE_CONTEXT_KEY, serdeContext);
     }
 
+    private static InjectableValues defaultInjectedValues() {
+        return new InjectableValues.Std().addValue("isDeserialize", true);
+    }
+
     private static Module createFlinkTableJacksonModule() {
         final SimpleModule module = new SimpleModule("Flink table module");
         ExecNodeMetadataUtil.execNodes()
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
index 4c6cf122138..15f58d70ed5 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.java
@@ -334,7 +334,7 @@ public class StreamExecIntervalJoin extends ExecNodeBase<RowData>
             IntervalJoinFunction joinFunction,
             JoinSpec joinSpec,
             IntervalJoinSpec.WindowBounds windowBounds,
-            ReadableConfig config) {
+            ExecNodeConfig config) {
         InternalTypeInfo<RowData> leftTypeInfo =
                 (InternalTypeInfo<RowData>) leftInputTransform.getOutputType();
         InternalTypeInfo<RowData> rightTypeInfo =
@@ -364,7 +364,7 @@ public class StreamExecIntervalJoin extends ExecNodeBase<RowData>
             IntervalJoinFunction joinFunction,
             JoinSpec joinSpec,
             IntervalJoinSpec.WindowBounds windowBounds,
-            ReadableConfig config) {
+            ExecNodeConfig config) {
 
         InternalTypeInfo<RowData> leftTypeInfo =
                 (InternalTypeInfo<RowData>) leftInputTransform.getOutputType();
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
index 74b12c5050b..1dd29be640b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMatch.java
@@ -269,7 +269,7 @@ public class StreamExecMatch extends ExecNodeBase<RowData>
     }
 
     private Transformation<RowData> translateOrder(
-            Transformation<RowData> inputTransform, RowType inputRowType, ReadableConfig config) {
+            Transformation<RowData> inputTransform, RowType inputRowType, ExecNodeConfig config) {
         SortSpec.SortFieldSpec timeOrderField = matchSpec.getOrderKeys().getFieldSpec(0);
         int timeOrderFieldIdx = timeOrderField.getFieldIndex();
         LogicalType timeOrderFieldType = inputRowType.getTypeAt(timeOrderFieldIdx);
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 6173fd55d2f..aa711e5ceb1 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -19,7 +19,6 @@ package org.apache.flink.table.planner.delegation
 
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.dag.Transformation
-import org.apache.flink.configuration.ReadableConfig
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.graph.StreamGraph
 import org.apache.flink.table.api._
@@ -185,7 +184,7 @@ abstract class PlannerBase(
 
     val relNodes = modifyOperations.map(translateToRel)
     val optimizedRelNodes = optimize(relNodes)
-    val execGraph = translateToExecNodeGraph(optimizedRelNodes)
+    val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = false)
     val transformations = translateToPlan(execGraph)
     afterTranslation()
     transformations
@@ -321,7 +320,9 @@ abstract class PlannerBase(
    * transforms the graph based on the given processors.
    */
   @VisibleForTesting
-  private[flink] def translateToExecNodeGraph(optimizedRelNodes: Seq[RelNode]): ExecNodeGraph = {
+  private[flink] def translateToExecNodeGraph(
+      optimizedRelNodes: Seq[RelNode],
+      isCompiled: Boolean): ExecNodeGraph = {
     val nonPhysicalRel = optimizedRelNodes.filterNot(_.isInstanceOf[FlinkPhysicalRel])
     if (nonPhysicalRel.nonEmpty) {
       throw new TableException(
@@ -338,7 +339,7 @@ abstract class PlannerBase(
     val reusedPlan = SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj, tableConfig)
     // convert FlinkPhysicalRel DAG to ExecNodeGraph
     val generator = new ExecNodeGraphGenerator()
-    val execGraph = generator.generate(reusedPlan.map(_.asInstanceOf[FlinkPhysicalRel]))
+    val execGraph = generator.generate(reusedPlan.map(_.asInstanceOf[FlinkPhysicalRel]), isCompiled)
 
     // process the graph
     val context = new ProcessorContext(this)
@@ -509,8 +510,7 @@ abstract class PlannerBase(
       case o => throw new TableException(s"Unsupported operation: ${o.getClass.getCanonicalName}")
     }
     val optimizedRelNodes = optimize(sinkRelNodes)
-    val execGraph = translateToExecNodeGraph(optimizedRelNodes)
-
+    val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = false)
     val transformations = translateToPlan(execGraph)
     afterTranslation()
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
index d00c1f9674a..fa59eabfa80 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala
@@ -170,7 +170,7 @@ class StreamPlanner(
     beforeTranslation()
     val relNodes = modifyOperations.map(translateToRel)
     val optimizedRelNodes = optimize(relNodes)
-    val execGraph = translateToExecNodeGraph(optimizedRelNodes)
+    val execGraph = translateToExecNodeGraph(optimizedRelNodes, isCompiled = true)
     afterTranslation()
 
     new ExecNodeGraphInternalPlan(
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/FlinkPhysicalRel.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/FlinkPhysicalRel.scala
index 3857bb4d783..7a8d4c5c52e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/FlinkPhysicalRel.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/FlinkPhysicalRel.scala
@@ -39,6 +39,22 @@ trait FlinkPhysicalRel extends FlinkRelNode {
    */
   def satisfyTraits(requiredTraitSet: RelTraitSet): Option[RelNode] = None
 
+  /**
+   * Translate this physical RelNode into an [[ExecNode]].
+   *
+   * NOTE: This method only needs to create the corresponding ExecNode, the connection to its
+   * input/output nodes will be done by ExecGraphGenerator. Because some physical rels need not be
+   * translated to a real ExecNode, such as Exchange will be translated to edge in the future.
+   *
+   * @param isCompiled
+   *   Whether the translation happens as part of a plan compilation.
+   */
+  def translateToExecNode(isCompiled: Boolean): ExecNode[_] = {
+    val execNode = translateToExecNode()
+    execNode.setCompiled(isCompiled)
+    execNode
+  }
+
   /**
    * Translate this physical RelNode into an [[ExecNode]].
    *
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/internal/CompiledPlanUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/internal/CompiledPlanUtils.java
index d9fcf846b99..2382b44c659 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/internal/CompiledPlanUtils.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/internal/CompiledPlanUtils.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.api.internal;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.table.api.CompiledPlan;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.planner.delegation.PlannerBase;
 import org.apache.flink.table.planner.plan.ExecNodeGraphInternalPlan;
 
 import java.util.List;
@@ -40,7 +39,6 @@ public class CompiledPlanUtils {
     public static List<Transformation<?>> toTransformations(
             TableEnvironment env, CompiledPlan compiledPlan) {
         final ExecNodeGraphInternalPlan internalPlan = unwrap(compiledPlan);
-        return ((PlannerBase) ((TableEnvironmentImpl) env).getPlanner())
-                .translateToPlan(internalPlan.getExecNodeGraph());
+        return ((TableEnvironmentImpl) env).getPlanner().translatePlan(internalPlan);
     }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java
index 7b8cfe39ddc..0f06d0877eb 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TestingBatchExecNode.java
@@ -106,6 +106,11 @@ public class TestingBatchExecNode implements BatchExecNode<RowData> {
         visitor.visit(this);
     }
 
+    @Override
+    public void setCompiled(boolean isCompiled) {
+        throw new TableException("Unsupported operation.");
+    }
+
     @Override
     public String toString() {
         return description;
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
index b0667b14ff9..20cf5080f90 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
@@ -26,12 +26,12 @@ import org.apache.flink.streaming.api.transformations.WithBoundedness;
 import org.apache.flink.table.api.CompiledPlan;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.PlanReference;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableDescriptor;
-import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.internal.CompiledPlanUtils;
 
 import org.junit.jupiter.api.Test;
@@ -39,8 +39,15 @@ import org.junit.jupiter.api.parallel.Execution;
 import org.junit.jupiter.api.parallel.ExecutionMode;
 
 import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_UID_GENERATION;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.UidGeneration.ALWAYS;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.UidGeneration.DISABLED;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.UidGeneration.PLAN_ONLY;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.InstanceOfAssertFactories.type;
 
@@ -108,10 +115,28 @@ class TransformationsTest {
     }
 
     @Test
-    public void testLegacyUid() {
-        final TableEnvironment env =
-                TableEnvironment.create(EnvironmentSettings.inStreamingMode().getConfiguration());
-        env.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS, true);
+    public void testUidGeneration() {
+        checkUids(c -> c.set(TABLE_EXEC_UID_GENERATION, PLAN_ONLY), true, false);
+        checkUids(c -> c.set(TABLE_EXEC_UID_GENERATION, ALWAYS), true, true);
+        checkUids(c -> c.set(TABLE_EXEC_UID_GENERATION, DISABLED), false, false);
+        checkUids(
+                c -> {
+                    c.set(TABLE_EXEC_UID_GENERATION, PLAN_ONLY);
+                    c.set(TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS, true);
+                },
+                false,
+                false);
+    }
+
+    private static void checkUids(
+            Consumer<TableConfig> config,
+            boolean expectUidWithCompilation,
+            boolean expectUidWithoutCompilation) {
+        final StreamTableEnvironment env =
+                StreamTableEnvironment.create(
+                        StreamExecutionEnvironment.getExecutionEnvironment(),
+                        EnvironmentSettings.newInstance().inStreamingMode().build());
+        config.accept(env.getConfig());
 
         env.createTemporaryTable(
                 "source_table",
@@ -122,24 +147,49 @@ class TransformationsTest {
         env.createTemporaryTable(
                 "sink_table", TableDescriptor.forConnector("values").schema(dummySchema()).build());
 
-        final CompiledPlan compiledPlan =
-                env.from("source_table")
-                        .select($("i").abs())
-                        .insertInto("sink_table")
-                        .compilePlan();
-
-        // There should be 3 transformations in this list: sink -> calc -> source
-        final List<Transformation<?>> transformations =
-                CompiledPlanUtils.toTransformations(env, compiledPlan)
-                        .get(0)
-                        .getTransitivePredecessors();
-        assertThat(transformations).hasSize(3);
-
-        // As the sink and source might set the uid, we only check the calc transformation.
-        assertThat(transformations)
-                .element(1, type(Transformation.class))
-                .extracting(Transformation::getUid)
-                .isNull();
+        // There should be 3 transformations: sink -> calc -> source
+        final Table table = env.from("source_table").select($("i").abs());
+
+        // Uses in-memory ExecNodes
+        final CompiledPlan memoryPlan = table.insertInto("sink_table").compilePlan();
+        final List<String> memoryUids =
+                CompiledPlanUtils.toTransformations(env, memoryPlan).get(0)
+                        .getTransitivePredecessors().stream()
+                        .map(Transformation::getUid)
+                        .collect(Collectors.toList());
+        assertThat(memoryUids).hasSize(3);
+        if (expectUidWithCompilation) {
+            assertThat(memoryUids).allSatisfy(u -> assertThat(u).isNotNull());
+        } else {
+            assertThat(memoryUids).allSatisfy(u -> assertThat(u).isNull());
+        }
+
+        // Uses deserialized ExecNodes
+        final String jsonPlan = table.insertInto("sink_table").compilePlan().asJsonString();
+        final List<String> jsonUids =
+                CompiledPlanUtils.toTransformations(
+                                env, env.loadPlan(PlanReference.fromJsonString(jsonPlan)))
+                        .get(0).getTransitivePredecessors().stream()
+                        .map(Transformation::getUid)
+                        .collect(Collectors.toList());
+        assertThat(jsonUids).hasSize(3);
+        if (expectUidWithCompilation) {
+            assertThat(jsonUids).allSatisfy(u -> assertThat(u).isNotNull());
+        } else {
+            assertThat(jsonUids).allSatisfy(u -> assertThat(u).isNull());
+        }
+
+        final List<String> inlineUids =
+                env.toChangelogStream(table).getTransformation().getTransitivePredecessors()
+                        .stream()
+                        .map(Transformation::getUid)
+                        .collect(Collectors.toList());
+        assertThat(inlineUids).hasSize(3);
+        if (expectUidWithoutCompilation) {
+            assertThat(inlineUids).allSatisfy(u -> assertThat(u).isNotNull());
+        } else {
+            assertThat(inlineUids).allSatisfy(u -> assertThat(u).isNull());
+        }
     }
 
     // --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
index 1c27bbb8c76..04d27f3ea5a 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessorTest.java
@@ -104,7 +104,8 @@ public class MultipleInputNodeCreationProcessorTest extends TableTestBase {
         RelNode relNode = TableTestUtil.toRelNode(table);
         FlinkPhysicalRel optimizedRel = (FlinkPhysicalRel) util.getPlanner().optimize(relNode);
         ExecNodeGraphGenerator generator = new ExecNodeGraphGenerator();
-        ExecNodeGraph execGraph = generator.generate(Collections.singletonList(optimizedRel));
+        ExecNodeGraph execGraph =
+                generator.generate(Collections.singletonList(optimizedRel), false);
         ExecNode<?> execNode = execGraph.getRootNodes().get(0);
         while (!execNode.getInputEdges().isEmpty()) {
             execNode = execNode.getInputEdges().get(0).getSource();
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/InternalConfigOptionsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/InternalConfigOptionsTest.java
index 532a0939f3f..4b3c1061416 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/InternalConfigOptionsTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/InternalConfigOptionsTest.java
@@ -72,7 +72,8 @@ public class InternalConfigOptionsTest extends TableTestBase {
                 tEnv.sqlQuery("SELECT LOCALTIME, LOCALTIMESTAMP, CURRENT_TIME, CURRENT_TIMESTAMP");
         RelNode relNode = planner.optimize(TableTestUtil.toRelNode(table));
         ExecNodeGraph execNodeGraph =
-                planner.translateToExecNodeGraph(toScala(Collections.singletonList(relNode)));
+                planner.translateToExecNodeGraph(
+                        toScala(Collections.singletonList(relNode)), false);
         // PlannerBase#translateToExecNodeGraph will set internal temporal configurations and
         // cleanup them after translate finished
         List<Transformation<?>> transformation = planner.translateToPlan(execNodeGraph);
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 58ab346d842..707e6773627 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -42,7 +42,7 @@ import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.factories.{FactoryUtil, PlannerFactoryUtil, StreamTableSourceFactory}
 import org.apache.flink.table.functions._
 import org.apache.flink.table.module.ModuleManager
-import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation, SinkModifyOperation}
+import org.apache.flink.table.operations.{ModifyOperation, QueryOperation}
 import org.apache.flink.table.planner.calcite.CalciteConfig
 import org.apache.flink.table.planner.delegation.PlannerBase
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
@@ -955,7 +955,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isStreamingMode: Boolean)
 
     // build optimized exec plan if `expectedPlans` contains OPT_EXEC
     val optimizedExecPlan = if (expectedPlans.contains(PlanKind.OPT_EXEC)) {
-      val execGraph = getPlanner.translateToExecNodeGraph(optimizedRels)
+      val execGraph = getPlanner.translateToExecNodeGraph(optimizedRels, isCompiled = false)
       System.lineSeparator + ExecNodePlanDumper.dagToString(execGraph)
     } else {
       ""