You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/09/06 10:04:25 UTC
[flink] branch release-1.16 updated: [FLINK-28897][runtime] Fix bug of failed to generate JobGraph when using UDF and enable checkpoint
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new f07df6864e5 [FLINK-28897][runtime] Fix bug of failed to generate JobGraph when using UDF and enable checkpoint
f07df6864e5 is described below
commit f07df6864e5da21473bf7396774b71ee5482c290
Author: Jark Wu <ja...@apache.org>
AuthorDate: Tue Sep 6 18:04:17 2022 +0800
[FLINK-28897][runtime] Fix bug of failed to generate JobGraph when using UDF and enable checkpoint
This closes #20761
---
.../flink/client/FlinkPipelineTranslationUtil.java | 21 ++++++++----
.../apache/flink/client/StreamGraphTranslator.java | 8 ++++-
.../org/apache/flink/client/cli/CliFrontend.java | 4 ++-
.../application/executors/EmbeddedExecutor.java | 3 +-
.../executors/AbstractJobClusterExecutor.java | 3 +-
.../executors/AbstractSessionClusterExecutor.java | 3 +-
.../client/deployment/executors/LocalExecutor.java | 7 ++--
.../executors/PipelineExecutorUtils.java | 12 +++++--
.../client/cli/CliFrontendPackageProgramTest.java | 3 +-
.../apache/flink/client/program/ClientTest.java | 10 ++++--
.../DefaultPackagedProgramRetrieverTest.java | 3 +-
.../table/sql/codegen/UsingRemoteJarITCase.java | 11 +++++++
.../src/test/resources/scalar_udf_e2e.sql | 37 ++++++++++++++++++++++
.../state/api/runtime/OperatorIDGeneratorTest.java | 3 +-
.../flink/streaming/api/graph/StreamGraph.java | 7 ++--
.../api/graph/StreamingJobGraphGenerator.java | 24 ++++++++++----
.../MiniClusterPipelineExecutorServiceLoader.java | 3 +-
.../checkpointing/ChangelogRecoveryITCaseBase.java | 2 +-
.../RescaleCheckpointManuallyITCase.java | 2 +-
19 files changed, 131 insertions(+), 35 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
index 67e8e724d21..eedf204676e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java
@@ -32,9 +32,13 @@ public final class FlinkPipelineTranslationUtil {
/** Transmogrifies the given {@link Pipeline} to a {@link JobGraph}. */
public static JobGraph getJobGraph(
- Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) {
+ ClassLoader userClassloader,
+ Pipeline pipeline,
+ Configuration optimizerConfiguration,
+ int defaultParallelism) {
- FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);
+ FlinkPipelineTranslator pipelineTranslator =
+ getPipelineTranslator(userClassloader, pipeline);
return pipelineTranslator.translateToJobGraph(
pipeline, optimizerConfiguration, defaultParallelism);
@@ -52,26 +56,29 @@ public final class FlinkPipelineTranslationUtil {
try {
Thread.currentThread().setContextClassLoader(userClassloader);
return FlinkPipelineTranslationUtil.getJobGraph(
- pipeline, configuration, defaultParallelism);
+ userClassloader, pipeline, configuration, defaultParallelism);
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
/** Extracts the execution plan (as JSON) from the given {@link Pipeline}. */
- public static String translateToJSONExecutionPlan(Pipeline pipeline) {
- FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);
+ public static String translateToJSONExecutionPlan(
+ ClassLoader userClassloader, Pipeline pipeline) {
+ FlinkPipelineTranslator pipelineTranslator =
+ getPipelineTranslator(userClassloader, pipeline);
return pipelineTranslator.translateToJSONExecutionPlan(pipeline);
}
- private static FlinkPipelineTranslator getPipelineTranslator(Pipeline pipeline) {
+ private static FlinkPipelineTranslator getPipelineTranslator(
+ ClassLoader userClassloader, Pipeline pipeline) {
PlanTranslator planTranslator = new PlanTranslator();
if (planTranslator.canTranslate(pipeline)) {
return planTranslator;
}
- StreamGraphTranslator streamGraphTranslator = new StreamGraphTranslator();
+ StreamGraphTranslator streamGraphTranslator = new StreamGraphTranslator(userClassloader);
if (streamGraphTranslator.canTranslate(pipeline)) {
return streamGraphTranslator;
diff --git a/flink-clients/src/main/java/org/apache/flink/client/StreamGraphTranslator.java b/flink-clients/src/main/java/org/apache/flink/client/StreamGraphTranslator.java
index 3e206c631a0..77e35ed9856 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/StreamGraphTranslator.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/StreamGraphTranslator.java
@@ -40,6 +40,12 @@ public class StreamGraphTranslator implements FlinkPipelineTranslator {
private static final Logger LOG = LoggerFactory.getLogger(StreamGraphTranslator.class);
+ private final ClassLoader userClassloader;
+
+ public StreamGraphTranslator(ClassLoader userClassloader) {
+ this.userClassloader = userClassloader;
+ }
+
@Override
public JobGraph translateToJobGraph(
Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) {
@@ -47,7 +53,7 @@ public class StreamGraphTranslator implements FlinkPipelineTranslator {
pipeline instanceof StreamGraph, "Given pipeline is not a DataStream StreamGraph.");
StreamGraph streamGraph = (StreamGraph) pipeline;
- return streamGraph.getJobGraph(null);
+ return streamGraph.getJobGraph(userClassloader, null);
}
@Override
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 652df51c292..f603dc90cda 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -355,7 +355,9 @@ public class CliFrontend {
Pipeline pipeline =
PackagedProgramUtils.getPipelineFromProgram(
program, effectiveConfiguration, parallelism, true);
- String jsonPlan = FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline);
+ String jsonPlan =
+ FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(
+ program.getUserCodeClassLoader(), pipeline);
if (jsonPlan != null) {
System.out.println(
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
index 92995f350c2..7094c1cac13 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
@@ -119,7 +119,8 @@ public class EmbeddedExecutor implements PipelineExecutor {
final Time timeout =
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
- final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
+ final JobGraph jobGraph =
+ PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);
final JobID actualJobId = jobGraph.getJobID();
this.submittedJobIds.add(actualJobId);
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java
index 6050488772e..f43e9ef46f9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java
@@ -68,7 +68,8 @@ public class AbstractJobClusterExecutor<
@Nonnull final Configuration configuration,
@Nonnull final ClassLoader userCodeClassloader)
throws Exception {
- final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
+ final JobGraph jobGraph =
+ PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);
try (final ClusterDescriptor<ClusterID> clusterDescriptor =
clusterClientFactory.createClusterDescriptor(configuration)) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
index eeff536de51..52c58e5d035 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
@@ -68,7 +68,8 @@ public class AbstractSessionClusterExecutor<
@Nonnull final Configuration configuration,
@Nonnull final ClassLoader userCodeClassloader)
throws Exception {
- final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
+ final JobGraph jobGraph =
+ PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);
try (final ClusterDescriptor<ClusterID> clusterDescriptor =
clusterClientFactory.createClusterDescriptor(configuration)) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
index 71f60ff4296..45847a6a62d 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
@@ -79,13 +79,14 @@ public class LocalExecutor implements PipelineExecutor {
// we only support attached execution with the local executor.
checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));
- final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);
+ final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader);
return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory)
.submitJob(jobGraph, userCodeClassloader);
}
- private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration)
+ private JobGraph getJobGraph(
+ Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader)
throws MalformedURLException {
// This is a quirk in how LocalEnvironment used to work. It sets the default parallelism
// to <num taskmanagers> * <num task slots>. Might be questionable but we keep the behaviour
@@ -101,6 +102,6 @@ public class LocalExecutor implements PipelineExecutor {
plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers);
}
- return PipelineExecutorUtils.getJobGraph(pipeline, configuration);
+ return PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);
}
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
index 06b1ada7d4f..76c5d67cc40 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
@@ -38,14 +38,17 @@ public class PipelineExecutorUtils {
/**
* Creates the {@link JobGraph} corresponding to the provided {@link Pipeline}.
*
- * @param pipeline the pipeline whose job graph we are computing
+ * @param pipeline the pipeline whose job graph we are computing.
* @param configuration the configuration with the necessary information such as jars and
* classpaths to be included, the parallelism of the job and potential savepoint settings
* used to bootstrap its state.
+ * @param userClassloader the classloader which can load user classes.
* @return the corresponding {@link JobGraph}.
*/
public static JobGraph getJobGraph(
- @Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration)
+ @Nonnull final Pipeline pipeline,
+ @Nonnull final Configuration configuration,
+ @Nonnull ClassLoader userClassloader)
throws MalformedURLException {
checkNotNull(pipeline);
checkNotNull(configuration);
@@ -54,7 +57,10 @@ public class PipelineExecutorUtils {
ExecutionConfigAccessor.fromConfiguration(configuration);
final JobGraph jobGraph =
FlinkPipelineTranslationUtil.getJobGraph(
- pipeline, configuration, executionConfigAccessor.getParallelism());
+ userClassloader,
+ pipeline,
+ configuration,
+ executionConfigAccessor.getParallelism());
configuration
.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
index 837bc8a99e9..82f95cc08c3 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java
@@ -326,7 +326,8 @@ class CliFrontendPackageProgramTest {
// we expect this to fail with a "ClassNotFoundException"
Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(prog, c, 666, true);
- FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline);
+ FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(
+ prog.getUserCodeClassLoader(), pipeline);
fail("Should have failed with a ClassNotFoundException");
} catch (ProgramInvocationException e) {
if (!(e.getCause() instanceof ClassNotFoundException)) {
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index f7a00c0fde8..b1659eb732c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -281,7 +281,12 @@ class ClientTest {
void shouldSubmitToJobClient() {
final ClusterClient<?> clusterClient =
new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
- JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(plan, new Configuration(), 1);
+ JobGraph jobGraph =
+ FlinkPipelineTranslationUtil.getJobGraph(
+ Thread.currentThread().getContextClassLoader(),
+ plan,
+ new Configuration(),
+ 1);
jobGraph.addJars(Collections.emptyList());
jobGraph.setClasspaths(Collections.emptyList());
@@ -516,7 +521,8 @@ class ClientTest {
return (pipeline, config, classLoader) -> {
final int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM);
final JobGraph jobGraph =
- FlinkPipelineTranslationUtil.getJobGraph(plan, config, parallelism);
+ FlinkPipelineTranslationUtil.getJobGraph(
+ classLoader, plan, config, parallelism);
final ExecutionConfigAccessor accessor =
ExecutionConfigAccessor.fromConfiguration(config);
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
index 8cf20164fe5..cb750edbd46 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java
@@ -552,7 +552,8 @@ class DefaultPackagedProgramRetrieverTest {
final Pipeline pipeline =
PackagedProgramUtils.getPipelineFromProgram(
packagedProgram, configuration, defaultParallelism, false);
- return PipelineExecutorUtils.getJobGraph(pipeline, configuration);
+ return PipelineExecutorUtils.getJobGraph(
+ pipeline, configuration, packagedProgram.getUserCodeClassLoader());
}
private static List<String> extractRelativizedURLsForJarsFromDirectory(File directory)
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
index a574d1f6459..e0cc0f00c99 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java
@@ -40,6 +40,7 @@ import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -110,6 +111,16 @@ public class UsingRemoteJarITCase extends SqlITCaseBase {
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
}
+ @Test
+ public void testScalarUdfWhenCheckpointEnable() throws Exception {
+ runAndCheckSQL(
+ "scalar_udf_e2e.sql",
+ generateReplaceVars(),
+ 1,
+ Collections.singletonList(
+ "{\"before\":null,\"after\":{\"id\":1,\"str\":\"Hello Flink\"},\"op\":\"c\"}"));
+ }
+
@Test
public void testCreateTemporarySystemFunctionUsingRemoteJar() throws Exception {
Map<String, String> replaceVars = generateReplaceVars();
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/scalar_udf_e2e.sql b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/scalar_udf_e2e.sql
new file mode 100644
index 00000000000..ec2d929faa0
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/scalar_udf_e2e.sql
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE JsonTable (
+ id INT,
+ str VARCHAR
+) WITH (
+ 'connector' = 'filesystem',
+ 'path' = '$RESULT',
+ 'sink.rolling-policy.rollover-interval' = '2s',
+ 'sink.rolling-policy.check-interval' = '2s',
+ 'format' = 'debezium-json'
+);
+
+ADD JAR '$JAR_PATH';
+create function func1 as 'org.apache.flink.table.toolbox.StringRegexReplaceFunction' LANGUAGE JAVA;
+
+SET execution.runtime-mode = $MODE;
+
+INSERT INTO JsonTable
+SELECT id, func1(str, 'World', 'Flink') FROM (VALUES (1, 'Hello World')) AS T(id, str);
+
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java
index 2148c185d36..ebbd9a6ba1c 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java
@@ -59,7 +59,8 @@ public class OperatorIDGeneratorTest {
.disableChaining()
.addSink(new DiscardingSink<>());
- JobGraph graph = env.getStreamGraph().getJobGraph(new JobID());
+ JobGraph graph =
+ env.getStreamGraph().getJobGraph(env.getClass().getClassLoader(), new JobID());
JobVertex vertex =
StreamSupport.stream(graph.getVertices().spliterator(), false)
.filter(node -> node.getName().contains(OPERATOR_NAME))
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 8420539d798..1542022f200 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -1007,13 +1007,14 @@ public class StreamGraph implements Pipeline {
}
/** Gets the assembled {@link JobGraph} with a random {@link JobID}. */
+ @VisibleForTesting
public JobGraph getJobGraph() {
- return getJobGraph(null);
+ return getJobGraph(Thread.currentThread().getContextClassLoader(), null);
}
/** Gets the assembled {@link JobGraph} with a specified {@link JobID}. */
- public JobGraph getJobGraph(@Nullable JobID jobID) {
- return StreamingJobGraphGenerator.createJobGraph(this, jobID);
+ public JobGraph getJobGraph(ClassLoader userClassLoader, @Nullable JobID jobID) {
+ return StreamingJobGraphGenerator.createJobGraph(userClassLoader, this, jobID);
}
public String getStreamingPlanAsJSON() {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index c50e7437959..a3a173a5d50 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -126,10 +126,16 @@ public class StreamingJobGraphGenerator {
@VisibleForTesting
public static JobGraph createJobGraph(StreamGraph streamGraph) {
- return new StreamingJobGraphGenerator(streamGraph, null, Runnable::run).createJobGraph();
+ return new StreamingJobGraphGenerator(
+ Thread.currentThread().getContextClassLoader(),
+ streamGraph,
+ null,
+ Runnable::run)
+ .createJobGraph();
}
- public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
+ public static JobGraph createJobGraph(
+ ClassLoader userClassLoader, StreamGraph streamGraph, @Nullable JobID jobID) {
// TODO Currently, we construct a new thread pool for the compilation of each job. In the
// future, we may refactor the job submission framework and make it reusable across jobs.
final ExecutorService serializationExecutor =
@@ -141,7 +147,8 @@ public class StreamingJobGraphGenerator {
streamGraph.getExecutionConfig().getParallelism())),
new ExecutorThreadFactory("flink-operator-serialization-io"));
try {
- return new StreamingJobGraphGenerator(streamGraph, jobID, serializationExecutor)
+ return new StreamingJobGraphGenerator(
+ userClassLoader, streamGraph, jobID, serializationExecutor)
.createJobGraph();
} finally {
serializationExecutor.shutdown();
@@ -150,6 +157,7 @@ public class StreamingJobGraphGenerator {
// ------------------------------------------------------------------------
+ private final ClassLoader userClassloader;
private final StreamGraph streamGraph;
private final Map<Integer, JobVertex> jobVertices;
@@ -181,7 +189,11 @@ public class StreamingJobGraphGenerator {
private final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs;
private StreamingJobGraphGenerator(
- StreamGraph streamGraph, @Nullable JobID jobID, Executor serializationExecutor) {
+ ClassLoader userClassloader,
+ StreamGraph streamGraph,
+ @Nullable JobID jobID,
+ Executor serializationExecutor) {
+ this.userClassloader = userClassloader;
this.streamGraph = streamGraph;
this.defaultStreamGraphHasher = new StreamGraphHasherV2();
this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());
@@ -455,11 +467,11 @@ public class StreamingJobGraphGenerator {
+ "\nThe user can force Unaligned Checkpoints by using 'execution.checkpointing.unaligned.forced'");
}
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
for (StreamNode node : streamGraph.getStreamNodes()) {
StreamOperatorFactory operatorFactory = node.getOperatorFactory();
if (operatorFactory != null) {
- Class<?> operatorClass = operatorFactory.getStreamOperatorClass(classLoader);
+ Class<?> operatorClass =
+ operatorFactory.getStreamOperatorClass(userClassloader);
if (InputSelectable.class.isAssignableFrom(operatorClass)) {
throw new UnsupportedOperationException(
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
index c55118202fa..c86639f76b3 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
@@ -156,7 +156,8 @@ public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecuto
public CompletableFuture<JobClient> execute(
Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassLoader)
throws Exception {
- final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
+ final JobGraph jobGraph =
+ PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassLoader);
if (jobGraph.getSavepointRestoreSettings() == SavepointRestoreSettings.none()
&& pipeline instanceof StreamGraph) {
jobGraph.setSavepointRestoreSettings(
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java
index b63ddf53e8d..7db742bb295 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java
@@ -206,7 +206,7 @@ public abstract class ChangelogRecoveryITCaseBase extends TestLogger {
Collector<Integer> out) {}
})
.addSink(new DiscardingSink<>());
- return env.getStreamGraph().getJobGraph(jobId);
+ return env.getStreamGraph().getJobGraph(env.getClass().getClassLoader(), jobId);
}
protected void waitAndAssert(JobGraph jobGraph) throws Exception {
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
index 3419f6c3df7..89313227c75 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java
@@ -278,7 +278,7 @@ public class RescaleCheckpointManuallyITCase extends TestLogger {
result.addSink(new CollectionSink<>());
- return env.getStreamGraph().getJobGraph(jobID.get());
+ return env.getStreamGraph().getJobGraph(env.getClass().getClassLoader(), jobID.get());
}
private static class NotifyingDefiniteKeySource extends RichParallelSourceFunction<Integer> {