You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/02/04 08:02:20 UTC
[flink] 02/10: [hotfix] Removes unused classloader parameter from CheckpointRecoveryFactory.createRecoveredCompletedCheckpointStore
This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 50be2480b29da6fb21d7033a86bef08b6a4a2ea5
Author: Matthias Pohl <ma...@ververica.com>
AuthorDate: Thu Nov 25 12:54:06 2021 +0100
[hotfix] Removes unused classloader parameter from CheckpointRecoveryFactory.createRecoveredCompletedCheckpointStore
---
.../highavailability/KubernetesCheckpointRecoveryFactory.java | 1 -
.../flink/runtime/checkpoint/CheckpointRecoveryFactory.java | 2 --
.../runtime/checkpoint/PerJobCheckpointRecoveryFactory.java | 1 -
.../checkpoint/StandaloneCheckpointRecoveryFactory.java | 1 -
.../runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java | 1 -
.../org/apache/flink/runtime/scheduler/DefaultScheduler.java | 1 -
.../java/org/apache/flink/runtime/scheduler/SchedulerBase.java | 2 --
.../org/apache/flink/runtime/scheduler/SchedulerUtils.java | 10 +---------
.../flink/runtime/scheduler/adaptive/AdaptiveScheduler.java | 7 +------
.../flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java | 6 ------
.../runtime/checkpoint/TestingCheckpointRecoveryFactory.java | 1 -
.../org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java | 3 ---
12 files changed, 2 insertions(+), 34 deletions(-)
diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
index 2c31864..ea78ecb 100644
--- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
+++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
@@ -80,7 +80,6 @@ public class KubernetesCheckpointRecoveryFactory implements CheckpointRecoveryFa
public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobID,
int maxNumberOfCheckpointsToRetain,
- ClassLoader userClassLoader,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor)
throws Exception {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
index 81d7009..ab0b5ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
@@ -34,7 +34,6 @@ public interface CheckpointRecoveryFactory {
*
* @param jobId Job ID to recover checkpoints for
* @param maxNumberOfCheckpointsToRetain Maximum number of checkpoints to retain
- * @param userClassLoader User code class loader of the job
* @param sharedStateRegistryFactory Simple factory to produce {@link SharedStateRegistry}
* objects.
* @param ioExecutor Executor used to run (async) deletes.
@@ -43,7 +42,6 @@ public interface CheckpointRecoveryFactory {
CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId,
int maxNumberOfCheckpointsToRetain,
- ClassLoader userClassLoader,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor)
throws Exception;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
index 83ce077..bc18c45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
@@ -74,7 +74,6 @@ public class PerJobCheckpointRecoveryFactory<T extends CompletedCheckpointStore>
public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId,
int maxNumberOfCheckpointsToRetain,
- ClassLoader userClassLoader,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor) {
return store.compute(
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index a77c6fb..95f9da7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -31,7 +31,6 @@ public class StandaloneCheckpointRecoveryFactory implements CheckpointRecoveryFa
public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId,
int maxNumberOfCheckpointsToRetain,
- ClassLoader userClassLoader,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor)
throws Exception {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index e38810d..052b340 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -50,7 +50,6 @@ public class ZooKeeperCheckpointRecoveryFactory implements CheckpointRecoveryFac
public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId,
int maxNumberOfCheckpointsToRetain,
- ClassLoader userClassLoader,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor)
throws Exception {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 1d0f3b2..3af4b61 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -193,7 +193,6 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
jobGraph,
ioExecutor,
jobMasterConfiguration,
- userCodeLoader,
checkpointsCleaner,
checkpointRecoveryFactory,
jobManagerJobMetricGroup,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 46af6e0..45a257d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -168,7 +168,6 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling
final JobGraph jobGraph,
final Executor ioExecutor,
final Configuration jobMasterConfiguration,
- final ClassLoader userCodeLoader,
final CheckpointsCleaner checkpointsCleaner,
final CheckpointRecoveryFactory checkpointRecoveryFactory,
final JobManagerJobMetricGroup jobManagerJobMetricGroup,
@@ -193,7 +192,6 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling
SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled(
jobGraph,
jobMasterConfiguration,
- userCodeLoader,
checkNotNull(checkpointRecoveryFactory),
ioExecutor,
log);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
index 5f61304..ab807ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
@@ -47,7 +47,6 @@ public final class SchedulerUtils {
public static CompletedCheckpointStore createCompletedCheckpointStoreIfCheckpointingIsEnabled(
JobGraph jobGraph,
Configuration configuration,
- ClassLoader userCodeLoader,
CheckpointRecoveryFactory checkpointRecoveryFactory,
Executor ioExecutor,
Logger log)
@@ -56,12 +55,7 @@ public final class SchedulerUtils {
if (DefaultExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) {
try {
return createCompletedCheckpointStore(
- configuration,
- userCodeLoader,
- checkpointRecoveryFactory,
- ioExecutor,
- log,
- jobId);
+ configuration, checkpointRecoveryFactory, ioExecutor, log, jobId);
} catch (Exception e) {
throw new JobExecutionException(
jobId,
@@ -76,7 +70,6 @@ public final class SchedulerUtils {
@VisibleForTesting
static CompletedCheckpointStore createCompletedCheckpointStore(
Configuration jobManagerConfig,
- ClassLoader classLoader,
CheckpointRecoveryFactory recoveryFactory,
Executor ioExecutor,
Logger log,
@@ -101,7 +94,6 @@ public final class SchedulerUtils {
return recoveryFactory.createRecoveredCompletedCheckpointStore(
jobId,
maxNumberOfCheckpointsToRetain,
- classLoader,
SharedStateRegistry.DEFAULT_FACTORY,
ioExecutor);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 8dfa51a..2276953 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -246,12 +246,7 @@ public class AdaptiveScheduler
this.checkpointsCleaner = checkpointsCleaner;
this.completedCheckpointStore =
SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled(
- jobGraph,
- configuration,
- userCodeClassLoader,
- checkpointRecoveryFactory,
- ioExecutor,
- LOG);
+ jobGraph, configuration, checkpointRecoveryFactory, ioExecutor, LOG);
this.checkpointIdCounter =
SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled(
jobGraph, checkpointRecoveryFactory);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
index acd9d80..2fe1736 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
@@ -40,15 +40,12 @@ public class PerJobCheckpointRecoveryTest extends TestLogger {
final CheckpointRecoveryFactory factory =
PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery(
maxCheckpoints -> store);
- final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-
final JobID firstJobId = new JobID();
assertSame(
store,
factory.createRecoveredCompletedCheckpointStore(
firstJobId,
1,
- classLoader,
SharedStateRegistry.DEFAULT_FACTORY,
Executors.directExecutor()));
assertThrows(
@@ -57,7 +54,6 @@ public class PerJobCheckpointRecoveryTest extends TestLogger {
factory.createRecoveredCompletedCheckpointStore(
firstJobId,
1,
- classLoader,
SharedStateRegistry.DEFAULT_FACTORY,
Executors.directExecutor()));
@@ -67,7 +63,6 @@ public class PerJobCheckpointRecoveryTest extends TestLogger {
factory.createRecoveredCompletedCheckpointStore(
secondJobId,
1,
- classLoader,
SharedStateRegistry.DEFAULT_FACTORY,
Executors.directExecutor()));
assertThrows(
@@ -76,7 +71,6 @@ public class PerJobCheckpointRecoveryTest extends TestLogger {
factory.createRecoveredCompletedCheckpointStore(
secondJobId,
1,
- classLoader,
SharedStateRegistry.DEFAULT_FACTORY,
Executors.directExecutor()));
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
index c1e8514..e164543 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
@@ -38,7 +38,6 @@ public class TestingCheckpointRecoveryFactory implements CheckpointRecoveryFacto
public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId,
int maxNumberOfCheckpointsToRetain,
- ClassLoader userClassLoader,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor) {
return store;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
index 8f5bffc..641a918 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
@@ -75,7 +75,6 @@ public class SchedulerUtilsTest extends TestLogger {
final CompletedCheckpointStore completedCheckpointStore =
SchedulerUtils.createCompletedCheckpointStore(
jobManagerConfig,
- getClass().getClassLoader(),
new StandaloneCheckpointRecoveryFactory(),
Executors.directExecutor(),
log,
@@ -102,7 +101,6 @@ public class SchedulerUtilsTest extends TestLogger {
CompletedCheckpointStore checkpointStore =
SchedulerUtils.createCompletedCheckpointStore(
new Configuration(),
- getClass().getClassLoader(),
recoveryFactory,
Executors.directExecutor(),
log,
@@ -123,7 +121,6 @@ public class SchedulerUtilsTest extends TestLogger {
public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
JobID jobId,
int maxNumberOfCheckpointsToRetain,
- ClassLoader userClassLoader,
SharedStateRegistryFactory sharedStateRegistryFactory,
Executor ioExecutor) {
List<CompletedCheckpoint> checkpoints = singletonList(checkpoint);