You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/02/04 08:02:20 UTC

[flink] 02/10: [hotfix] Removes unused classloader parameter from CheckpointRecoveryFactory.createRecoveredCompletedCheckpointStore

This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 50be2480b29da6fb21d7033a86bef08b6a4a2ea5
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Thu Nov 25 12:54:06 2021 +0100

    [hotfix] Removes unused classloader parameter from CheckpointRecoveryFactory.createRecoveredCompletedCheckpointStore
---
 .../highavailability/KubernetesCheckpointRecoveryFactory.java  |  1 -
 .../flink/runtime/checkpoint/CheckpointRecoveryFactory.java    |  2 --
 .../runtime/checkpoint/PerJobCheckpointRecoveryFactory.java    |  1 -
 .../checkpoint/StandaloneCheckpointRecoveryFactory.java        |  1 -
 .../runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java |  1 -
 .../org/apache/flink/runtime/scheduler/DefaultScheduler.java   |  1 -
 .../java/org/apache/flink/runtime/scheduler/SchedulerBase.java |  2 --
 .../org/apache/flink/runtime/scheduler/SchedulerUtils.java     | 10 +---------
 .../flink/runtime/scheduler/adaptive/AdaptiveScheduler.java    |  7 +------
 .../flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java |  6 ------
 .../runtime/checkpoint/TestingCheckpointRecoveryFactory.java   |  1 -
 .../org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java |  3 ---
 12 files changed, 2 insertions(+), 34 deletions(-)

diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
index 2c31864..ea78ecb 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
@@ -80,7 +80,6 @@ public class KubernetesCheckpointRecoveryFactory implements CheckpointRecoveryFa
     public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
             JobID jobID,
             int maxNumberOfCheckpointsToRetain,
-            ClassLoader userClassLoader,
             SharedStateRegistryFactory sharedStateRegistryFactory,
             Executor ioExecutor)
             throws Exception {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
index 81d7009..ab0b5ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
@@ -34,7 +34,6 @@ public interface CheckpointRecoveryFactory {
      *
      * @param jobId Job ID to recover checkpoints for
      * @param maxNumberOfCheckpointsToRetain Maximum number of checkpoints to retain
-     * @param userClassLoader User code class loader of the job
      * @param sharedStateRegistryFactory Simple factory to produce {@link SharedStateRegistry}
      *     objects.
      * @param ioExecutor Executor used to run (async) deletes.
@@ -43,7 +42,6 @@ public interface CheckpointRecoveryFactory {
     CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
             JobID jobId,
             int maxNumberOfCheckpointsToRetain,
-            ClassLoader userClassLoader,
             SharedStateRegistryFactory sharedStateRegistryFactory,
             Executor ioExecutor)
             throws Exception;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
index 83ce077..bc18c45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
@@ -74,7 +74,6 @@ public class PerJobCheckpointRecoveryFactory<T extends CompletedCheckpointStore>
     public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
             JobID jobId,
             int maxNumberOfCheckpointsToRetain,
-            ClassLoader userClassLoader,
             SharedStateRegistryFactory sharedStateRegistryFactory,
             Executor ioExecutor) {
         return store.compute(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index a77c6fb..95f9da7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -31,7 +31,6 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa
     public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
             JobID jobId,
             int maxNumberOfCheckpointsToRetain,
-            ClassLoader userClassLoader,
             SharedStateRegistryFactory sharedStateRegistryFactory,
             Executor ioExecutor)
             throws Exception {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index e38810d..052b340 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -50,7 +50,6 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac
     public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
             JobID jobId,
             int maxNumberOfCheckpointsToRetain,
-            ClassLoader userClassLoader,
             SharedStateRegistryFactory sharedStateRegistryFactory,
             Executor ioExecutor)
             throws Exception {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 1d0f3b2..3af4b61 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -193,7 +193,6 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
                 jobGraph,
                 ioExecutor,
                 jobMasterConfiguration,
-                userCodeLoader,
                 checkpointsCleaner,
                 checkpointRecoveryFactory,
                 jobManagerJobMetricGroup,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 46af6e0..45a257d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -168,7 +168,6 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling
             final JobGraph jobGraph,
             final Executor ioExecutor,
             final Configuration jobMasterConfiguration,
-            final ClassLoader userCodeLoader,
             final CheckpointsCleaner checkpointsCleaner,
             final CheckpointRecoveryFactory checkpointRecoveryFactory,
             final JobManagerJobMetricGroup jobManagerJobMetricGroup,
@@ -193,7 +192,6 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling
                 SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled(
                         jobGraph,
                         jobMasterConfiguration,
-                        userCodeLoader,
                         checkNotNull(checkpointRecoveryFactory),
                         ioExecutor,
                         log);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
index 5f61304..ab807ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
@@ -47,7 +47,6 @@ public final class SchedulerUtils {
     public static CompletedCheckpointStore createCompletedCheckpointStoreIfCheckpointingIsEnabled(
             JobGraph jobGraph,
             Configuration configuration,
-            ClassLoader userCodeLoader,
             CheckpointRecoveryFactory checkpointRecoveryFactory,
             Executor ioExecutor,
             Logger log)
@@ -56,12 +55,7 @@ public final class SchedulerUtils {
         if (DefaultExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) {
             try {
                 return createCompletedCheckpointStore(
-                        configuration,
-                        userCodeLoader,
-                        checkpointRecoveryFactory,
-                        ioExecutor,
-                        log,
-                        jobId);
+                        configuration, checkpointRecoveryFactory, ioExecutor, log, jobId);
             } catch (Exception e) {
                 throw new JobExecutionException(
                         jobId,
@@ -76,7 +70,6 @@ public final class SchedulerUtils {
     @VisibleForTesting
     static CompletedCheckpointStore createCompletedCheckpointStore(
             Configuration jobManagerConfig,
-            ClassLoader classLoader,
             CheckpointRecoveryFactory recoveryFactory,
             Executor ioExecutor,
             Logger log,
@@ -101,7 +94,6 @@ public final class SchedulerUtils {
         return recoveryFactory.createRecoveredCompletedCheckpointStore(
                 jobId,
                 maxNumberOfCheckpointsToRetain,
-                classLoader,
                 SharedStateRegistry.DEFAULT_FACTORY,
                 ioExecutor);
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 8dfa51a..2276953 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -246,12 +246,7 @@ public class AdaptiveScheduler
         this.checkpointsCleaner = checkpointsCleaner;
         this.completedCheckpointStore =
                 SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled(
-                        jobGraph,
-                        configuration,
-                        userCodeClassLoader,
-                        checkpointRecoveryFactory,
-                        ioExecutor,
-                        LOG);
+                        jobGraph, configuration, checkpointRecoveryFactory, ioExecutor, LOG);
         this.checkpointIdCounter =
                 SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled(
                         jobGraph, checkpointRecoveryFactory);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
index acd9d80..2fe1736 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
@@ -40,15 +40,12 @@ public class PerJobCheckpointRecoveryTest extends TestLogger {
         final CheckpointRecoveryFactory factory =
                 PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery(
                         maxCheckpoints -> store);
-        final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-
         final JobID firstJobId = new JobID();
         assertSame(
                 store,
                 factory.createRecoveredCompletedCheckpointStore(
                         firstJobId,
                         1,
-                        classLoader,
                         SharedStateRegistry.DEFAULT_FACTORY,
                         Executors.directExecutor()));
         assertThrows(
@@ -57,7 +54,6 @@ public class PerJobCheckpointRecoveryTest extends TestLogger {
                         factory.createRecoveredCompletedCheckpointStore(
                                 firstJobId,
                                 1,
-                                classLoader,
                                 SharedStateRegistry.DEFAULT_FACTORY,
                                 Executors.directExecutor()));
 
@@ -67,7 +63,6 @@ public class PerJobCheckpointRecoveryTest extends TestLogger {
                 factory.createRecoveredCompletedCheckpointStore(
                         secondJobId,
                         1,
-                        classLoader,
                         SharedStateRegistry.DEFAULT_FACTORY,
                         Executors.directExecutor()));
         assertThrows(
@@ -76,7 +71,6 @@ public class PerJobCheckpointRecoveryTest extends TestLogger {
                         factory.createRecoveredCompletedCheckpointStore(
                                 secondJobId,
                                 1,
-                                classLoader,
                                 SharedStateRegistry.DEFAULT_FACTORY,
                                 Executors.directExecutor()));
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
index c1e8514..e164543 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
@@ -38,7 +38,6 @@ public class TestingCheckpointRecoveryFactory implements CheckpointRecoveryFacto
     public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
             JobID jobId,
             int maxNumberOfCheckpointsToRetain,
-            ClassLoader userClassLoader,
             SharedStateRegistryFactory sharedStateRegistryFactory,
             Executor ioExecutor) {
         return store;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
index 8f5bffc..641a918 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
@@ -75,7 +75,6 @@ public class SchedulerUtilsTest extends TestLogger {
         final CompletedCheckpointStore completedCheckpointStore =
                 SchedulerUtils.createCompletedCheckpointStore(
                         jobManagerConfig,
-                        getClass().getClassLoader(),
                         new StandaloneCheckpointRecoveryFactory(),
                         Executors.directExecutor(),
                         log,
@@ -102,7 +101,6 @@ public class SchedulerUtilsTest extends TestLogger {
         CompletedCheckpointStore checkpointStore =
                 SchedulerUtils.createCompletedCheckpointStore(
                         new Configuration(),
-                        getClass().getClassLoader(),
                         recoveryFactory,
                         Executors.directExecutor(),
                         log,
@@ -123,7 +121,6 @@ public class SchedulerUtilsTest extends TestLogger {
             public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
                     JobID jobId,
                     int maxNumberOfCheckpointsToRetain,
-                    ClassLoader userClassLoader,
                     SharedStateRegistryFactory sharedStateRegistryFactory,
                     Executor ioExecutor) {
                 List<CompletedCheckpoint> checkpoints = singletonList(checkpoint);