You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/09/06 10:04:06 UTC

[flink] branch master updated: [FLINK-28897][runtime] Fix bug of failed to generate JobGraph when using UDF and enable checkpoint

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 97f5a45cd03 [FLINK-28897][runtime] Fix bug of failed to generate JobGraph when using UDF and enable checkpoint
97f5a45cd03 is described below

commit 97f5a45cd035fbae37a7468c6f771451ddb4a0a4
Author: Ron <ld...@163.com>
AuthorDate: Tue Sep 6 18:03:54 2022 +0800

    [FLINK-28897][runtime] Fix bug of failed to generate JobGraph when using UDF and enable checkpoint
    
    This closes #20713
---
 .../flink/client/FlinkPipelineTranslationUtil.java | 21 ++++++++----
 .../apache/flink/client/StreamGraphTranslator.java |  8 ++++-
 .../org/apache/flink/client/cli/CliFrontend.java   |  4 ++-
 .../application/executors/EmbeddedExecutor.java    |  3 +-
 .../executors/AbstractJobClusterExecutor.java      |  3 +-
 .../executors/AbstractSessionClusterExecutor.java  |  3 +-
 .../client/deployment/executors/LocalExecutor.java |  7 ++--
 .../executors/PipelineExecutorUtils.java           | 12 +++++--
 .../client/cli/CliFrontendPackageProgramTest.java  |  3 +-
 .../apache/flink/client/program/ClientTest.java    | 10 ++++--
 .../DefaultPackagedProgramRetrieverTest.java       |  3 +-
 .../table/sql/codegen/UsingRemoteJarITCase.java    | 11 +++++++
 .../src/test/resources/scalar_udf_e2e.sql          | 37 ++++++++++++++++++++++
 .../state/api/runtime/OperatorIDGeneratorTest.java |  3 +-
 .../flink/streaming/api/graph/StreamGraph.java     |  7 ++--
 .../api/graph/StreamingJobGraphGenerator.java      | 24 ++++++++++----
 .../MiniClusterPipelineExecutorServiceLoader.java  |  3 +-
 .../checkpointing/ChangelogRecoveryITCaseBase.java |  2 +-
 .../RescaleCheckpointManuallyITCase.java           |  2 +-
 19 files changed, 131 insertions(+), 35 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
index 67e8e724d21..eedf204676e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
@@ -32,9 +32,13 @@ public final class FlinkPipelineTranslationUtil {
 
     /** Transmogrifies the given {@link Pipeline} to a {@link JobGraph}. */
     public static JobGraph getJobGraph(
-            Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) {
+            ClassLoader userClassloader,
+            Pipeline pipeline,
+            Configuration optimizerConfiguration,
+            int defaultParallelism) {
 
-        FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);
+        FlinkPipelineTranslator pipelineTranslator =
+                getPipelineTranslator(userClassloader, pipeline);
 
         return pipelineTranslator.translateToJobGraph(
                 pipeline, optimizerConfiguration, defaultParallelism);
@@ -52,26 +56,29 @@ public final class FlinkPipelineTranslationUtil {
         try {
             Thread.currentThread().setContextClassLoader(userClassloader);
             return FlinkPipelineTranslationUtil.getJobGraph(
-                    pipeline, configuration, defaultParallelism);
+                    userClassloader, pipeline, configuration, defaultParallelism);
         } finally {
             Thread.currentThread().setContextClassLoader(contextClassLoader);
         }
     }
 
     /** Extracts the execution plan (as JSON) from the given {@link Pipeline}. */
-    public static String translateToJSONExecutionPlan(Pipeline pipeline) {
-        FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);
+    public static String translateToJSONExecutionPlan(
+            ClassLoader userClassloader, Pipeline pipeline) {
+        FlinkPipelineTranslator pipelineTranslator =
+                getPipelineTranslator(userClassloader, pipeline);
         return pipelineTranslator.translateToJSONExecutionPlan(pipeline);
     }
 
-    private static FlinkPipelineTranslator getPipelineTranslator(Pipeline pipeline) {
+    private static FlinkPipelineTranslator getPipelineTranslator(
+            ClassLoader userClassloader, Pipeline pipeline) {
         PlanTranslator planTranslator = new PlanTranslator();
 
         if (planTranslator.canTranslate(pipeline)) {
             return planTranslator;
         }
 
-        StreamGraphTranslator streamGraphTranslator = new StreamGraphTranslator();
+        StreamGraphTranslator streamGraphTranslator = new StreamGraphTranslator(userClassloader);
 
         if (streamGraphTranslator.canTranslate(pipeline)) {
             return streamGraphTranslator;
diff --git a/flink-clients/src/main/java/org/apache/flink/client/StreamGraphTranslator.java b/flink-clients/src/main/java/org/apache/flink/client/StreamGraphTranslator.java
index 3e206c631a0..77e35ed9856 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/StreamGraphTranslator.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/StreamGraphTranslator.java
@@ -40,6 +40,12 @@ public class StreamGraphTranslator implements FlinkPipelineTranslator {
 
     private static final Logger LOG = LoggerFactory.getLogger(StreamGraphTranslator.class);
 
+    private final ClassLoader userClassloader;
+
+    public StreamGraphTranslator(ClassLoader userClassloader) {
+        this.userClassloader = userClassloader;
+    }
+
     @Override
     public JobGraph translateToJobGraph(
             Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) {
@@ -47,7 +53,7 @@ public class StreamGraphTranslator implements FlinkPipelineTranslator {
                 pipeline instanceof StreamGraph, "Given pipeline is not a DataStream StreamGraph.");
 
         StreamGraph streamGraph = (StreamGraph) pipeline;
-        return streamGraph.getJobGraph(null);
+        return streamGraph.getJobGraph(userClassloader, null);
     }
 
     @Override
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 652df51c292..f603dc90cda 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -355,7 +355,9 @@ public class CliFrontend {
             Pipeline pipeline =
                     PackagedProgramUtils.getPipelineFromProgram(
                             program, effectiveConfiguration, parallelism, true);
-            String jsonPlan = FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline);
+            String jsonPlan =
+                    FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(
+                            program.getUserCodeClassLoader(), pipeline);
 
             if (jsonPlan != null) {
                 System.out.println(
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
index 92995f350c2..7094c1cac13 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
@@ -119,7 +119,8 @@ public class EmbeddedExecutor implements PipelineExecutor {
         final Time timeout =
                 Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
 
-        final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
+        final JobGraph jobGraph =
+                PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);
         final JobID actualJobId = jobGraph.getJobID();
 
         this.submittedJobIds.add(actualJobId);
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java
index 6050488772e..f43e9ef46f9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java
@@ -68,7 +68,8 @@ public class AbstractJobClusterExecutor<
             @Nonnull final Configuration configuration,
             @Nonnull final ClassLoader userCodeClassloader)
             throws Exception {
-        final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
+        final JobGraph jobGraph =
+                PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);
 
         try (final ClusterDescriptor<ClusterID> clusterDescriptor =
                 clusterClientFactory.createClusterDescriptor(configuration)) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
index eeff536de51..52c58e5d035 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
@@ -68,7 +68,8 @@ public class AbstractSessionClusterExecutor<
             @Nonnull final Configuration configuration,
             @Nonnull final ClassLoader userCodeClassloader)
             throws Exception {
-        final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
+        final JobGraph jobGraph =
+                PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);
 
         try (final ClusterDescriptor<ClusterID> clusterDescriptor =
                 clusterClientFactory.createClusterDescriptor(configuration)) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
index 71f60ff4296..45847a6a62d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
@@ -79,13 +79,14 @@ public class LocalExecutor implements PipelineExecutor {
         // we only support attached execution with the local executor.
         checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
 
-        final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);
+        final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader);
 
         return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory)
                 .submitJob(jobGraph, userCodeClassloader);
     }
 
-    private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration)
+    private JobGraph getJobGraph(
+            Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)
             throws MalformedURLException {
         // This is a quirk in how LocalEnvironment used to work. It sets the default parallelism
         // to <num taskmanagers> * <num task slots>. Might be questionable but we keep the behaviour
@@ -101,6 +102,6 @@ public class LocalExecutor implements PipelineExecutor {
             plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
         }
 
-        return PipelineExecutorUtils.getJobGraph(pipeline, configuration);
+        return PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);
     }
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
index 06b1ada7d4f..76c5d67cc40 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
@@ -38,14 +38,17 @@ public class PipelineExecutorUtils {
     /**
      * Creates the {@link JobGraph} corresponding to the provided {@link Pipeline}.
      *
-     * @param pipeline the pipeline whose job graph we are computing
+     * @param pipeline the pipeline whose job graph we are computing.
      * @param configuration the configuration with the necessary information such as jars and
      *     classpaths to be included, the parallelism of the job and potential savepoint settings
      *     used to bootstrap its state.
+     * @param userClassloader the classloader which can load user classes.
      * @return the corresponding {@link JobGraph}.
      */
     public static JobGraph getJobGraph(
-            @Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration)
+            @Nonnull final Pipeline pipeline,
+            @Nonnull final Configuration configuration,
+            @Nonnull ClassLoader userClassloader)
             throws MalformedURLException {
         checkNotNull(pipeline);
         checkNotNull(configuration);
@@ -54,7 +57,10 @@ public class PipelineExecutorUtils {
                 ExecutionConfigAccessor.fromConfiguration(configuration);
         final JobGraph jobGraph =
                 FlinkPipelineTranslationUtil.getJobGraph(
-                        pipeline, configuration, executionConfigAccessor.getParallelism());
+                        userClassloader,
+                        pipeline,
+                        configuration,
+                        executionConfigAccessor.getParallelism());
 
         configuration
                 .getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
index 837bc8a99e9..82f95cc08c3 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
@@ -326,7 +326,8 @@ class CliFrontendPackageProgramTest {
 
             // we expect this to fail with a "ClassNotFoundException"
             Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(prog, c, 666, true);
-            FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline);
+            FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(
+                    prog.getUserCodeClassLoader(), pipeline);
             fail("Should have failed with a ClassNotFoundException");
         } catch (ProgramInvocationException e) {
             if (!(e.getCause() instanceof ClassNotFoundException)) {
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index f7a00c0fde8..b1659eb732c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -281,7 +281,12 @@ class ClientTest {
     void shouldSubmitToJobClient() {
         final ClusterClient<?> clusterClient =
                 new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
-        JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(plan, new Configuration(), 1);
+        JobGraph jobGraph =
+                FlinkPipelineTranslationUtil.getJobGraph(
+                        Thread.currentThread().getContextClassLoader(),
+                        plan,
+                        new Configuration(),
+                        1);
 
         jobGraph.addJars(Collections.emptyList());
         jobGraph.setClasspaths(Collections.emptyList());
@@ -516,7 +521,8 @@ class ClientTest {
                     return (pipeline, config, classLoader) -> {
                         final int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM);
                         final JobGraph jobGraph =
-                                FlinkPipelineTranslationUtil.getJobGraph(plan, config, parallelism);
+                                FlinkPipelineTranslationUtil.getJobGraph(
+                                        classLoader, plan, config, parallelism);
 
                         final ExecutionConfigAccessor accessor =
                                 ExecutionConfigAccessor.fromConfiguration(config);
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
index 8cf20164fe5..cb750edbd46 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
@@ -552,7 +552,8 @@ class DefaultPackagedProgramRetrieverTest {
         final Pipeline pipeline =
                 PackagedProgramUtils.getPipelineFromProgram(
                         packagedProgram, configuration, defaultParallelism, false);
-        return PipelineExecutorUtils.getJobGraph(pipeline, configuration);
+        return PipelineExecutorUtils.getJobGraph(
+                pipeline, configuration, packagedProgram.getUserCodeClassLoader());
     }
 
     private static List<String> extractRelativizedURLsForJarsFromDirectory(File directory)
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
index a574d1f6459..e0cc0f00c99 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
@@ -40,6 +40,7 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -110,6 +111,16 @@ public class UsingRemoteJarITCase extends SqlITCaseBase {
                         "{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
     }
 
+    @Test
+    public void testScalarUdfWhenCheckpointEnable() throws Exception {
+        runAndCheckSQL(
+                "scalar_udf_e2e.sql",
+                generateReplaceVars(),
+                1,
+                Collections.singletonList(
+                        "{\"before\":null,\"after\":{\"id\":1,\"str\":\"Hello Flink\"},\"op\":\"c\"}"));
+    }
+
     @Test
     public void testCreateTemporarySystemFunctionUsingRemoteJar() throws Exception {
         Map<String, String> replaceVars = generateReplaceVars();
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/scalar_udf_e2e.sql b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/scalar_udf_e2e.sql
new file mode 100644
index 00000000000..ec2d929faa0
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/scalar_udf_e2e.sql
@@ -0,0 +1,37 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+CREATE TABLE JsonTable (
+    id INT,
+    str VARCHAR
+) WITH (
+    'connector' = 'filesystem',
+    'path' = '$RESULT',
+    'sink.rolling-policy.rollover-interval' = '2s',
+    'sink.rolling-policy.check-interval' = '2s',
+    'format' = 'debezium-json'
+);
+
+ADD JAR '$JAR_PATH';
+create function func1 as 'org.apache.flink.table.toolbox.StringRegexReplaceFunction' LANGUAGE JAVA;
+
+SET execution.runtime-mode = $MODE;
+
+INSERT INTO JsonTable
+SELECT id, func1(str, 'World', 'Flink') FROM (VALUES (1, 'Hello World')) AS T(id, str);
+
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java
index 2148c185d36..ebbd9a6ba1c 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java
@@ -59,7 +59,8 @@ public class OperatorIDGeneratorTest {
                 .disableChaining()
                 .addSink(new DiscardingSink<>());
 
-        JobGraph graph = env.getStreamGraph().getJobGraph(new JobID());
+        JobGraph graph =
+                env.getStreamGraph().getJobGraph(env.getClass().getClassLoader(), new JobID());
         JobVertex vertex =
                 StreamSupport.stream(graph.getVertices().spliterator(), false)
                         .filter(node -> node.getName().contains(OPERATOR_NAME))
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 8420539d798..1542022f200 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -1007,13 +1007,14 @@ public class StreamGraph implements Pipeline {
     }
 
     /** Gets the assembled {@link JobGraph} with a random {@link JobID}. */
+    @VisibleForTesting
     public JobGraph getJobGraph() {
-        return getJobGraph(null);
+        return getJobGraph(Thread.currentThread().getContextClassLoader(), null);
     }
 
     /** Gets the assembled {@link JobGraph} with a specified {@link JobID}. */
-    public JobGraph getJobGraph(@Nullable JobID jobID) {
-        return StreamingJobGraphGenerator.createJobGraph(this, jobID);
+    public JobGraph getJobGraph(ClassLoader userClassLoader, @Nullable JobID jobID) {
+        return StreamingJobGraphGenerator.createJobGraph(userClassLoader, this, jobID);
     }
 
     public String getStreamingPlanAsJSON() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index c50e7437959..a3a173a5d50 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -126,10 +126,16 @@ public class StreamingJobGraphGenerator {
 
     @VisibleForTesting
     public static JobGraph createJobGraph(StreamGraph streamGraph) {
-        return new StreamingJobGraphGenerator(streamGraph, null, Runnable::run).createJobGraph();
+        return new StreamingJobGraphGenerator(
+                        Thread.currentThread().getContextClassLoader(),
+                        streamGraph,
+                        null,
+                        Runnable::run)
+                .createJobGraph();
     }
 
-    public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
+    public static JobGraph createJobGraph(
+            ClassLoader userClassLoader, StreamGraph streamGraph, @Nullable JobID jobID) {
         // TODO Currently, we construct a new thread pool for the compilation of each job. In the
         // future, we may refactor the job submission framework and make it reusable across jobs.
         final ExecutorService serializationExecutor =
@@ -141,7 +147,8 @@ public class StreamingJobGraphGenerator {
                                         streamGraph.getExecutionConfig().getParallelism())),
                         new ExecutorThreadFactory("flink-operator-serialization-io"));
         try {
-            return new StreamingJobGraphGenerator(streamGraph, jobID, serializationExecutor)
+            return new StreamingJobGraphGenerator(
+                            userClassLoader, streamGraph, jobID, serializationExecutor)
                     .createJobGraph();
         } finally {
             serializationExecutor.shutdown();
@@ -150,6 +157,7 @@ public class StreamingJobGraphGenerator {
 
     // ------------------------------------------------------------------------
 
+    private final ClassLoader userClassloader;
     private final StreamGraph streamGraph;
 
     private final Map<Integer, JobVertex> jobVertices;
@@ -181,7 +189,11 @@ public class StreamingJobGraphGenerator {
     private final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs;
 
     private StreamingJobGraphGenerator(
-            StreamGraph streamGraph, @Nullable JobID jobID, Executor serializationExecutor) {
+            ClassLoader userClassloader,
+            StreamGraph streamGraph,
+            @Nullable JobID jobID,
+            Executor serializationExecutor) {
+        this.userClassloader = userClassloader;
         this.streamGraph = streamGraph;
         this.defaultStreamGraphHasher = new StreamGraphHasherV2();
         this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());
@@ -455,11 +467,11 @@ public class StreamingJobGraphGenerator {
                                 + "\nThe user can force Unaligned Checkpoints by using 'execution.checkpointing.unaligned.forced'");
             }
 
-            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
             for (StreamNode node : streamGraph.getStreamNodes()) {
                 StreamOperatorFactory operatorFactory = node.getOperatorFactory();
                 if (operatorFactory != null) {
-                    Class<?> operatorClass = operatorFactory.getStreamOperatorClass(classLoader);
+                    Class<?> operatorClass =
+                            operatorFactory.getStreamOperatorClass(userClassloader);
                     if (InputSelectable.class.isAssignableFrom(operatorClass)) {
 
                         throw new UnsupportedOperationException(
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
index c55118202fa..c86639f76b3 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
@@ -156,7 +156,8 @@ public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecuto
         public CompletableFuture<JobClient> execute(
                 Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassLoader)
                 throws Exception {
-            final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
+            final JobGraph jobGraph =
+                    PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassLoader);
             if (jobGraph.getSavepointRestoreSettings() == SavepointRestoreSettings.none()
                     && pipeline instanceof StreamGraph) {
                 jobGraph.setSavepointRestoreSettings(
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java
index b63ddf53e8d..7db742bb295 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java
@@ -206,7 +206,7 @@ public abstract class ChangelogRecoveryITCaseBase extends TestLogger {
                                     Collector<Integer> out) {}
                         })
                 .addSink(new DiscardingSink<>());
-        return env.getStreamGraph().getJobGraph(jobId);
+        return env.getStreamGraph().getJobGraph(env.getClass().getClassLoader(), jobId);
     }
 
     protected void waitAndAssert(JobGraph jobGraph) throws Exception {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
index 3419f6c3df7..89313227c75 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
@@ -278,7 +278,7 @@ public class RescaleCheckpointManuallyITCase extends TestLogger {
 
         result.addSink(new CollectionSink<>());
 
-        return env.getStreamGraph().getJobGraph(jobID.get());
+        return env.getStreamGraph().getJobGraph(env.getClass().getClassLoader(), jobID.get());
     }
 
     private static class NotifyingDefiniteKeySource extends RichParallelSourceFunction<Integer> {