You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/07/08 15:35:33 UTC

[flink] branch master updated: [FLINK-28172][changelog] Scatter dstl files into separate directories by job id

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 135130379e5 [FLINK-28172][changelog] Scatter dstl files into separate directories by job id
135130379e5 is described below

commit 135130379e592d9ae752bc4eea20b6fb222f8c15
Author: wangfeifan <zo...@163.com>
AuthorDate: Tue Jun 28 16:33:11 2022 +0800

    [FLINK-28172][changelog] Scatter dstl files into separate directories by job id
---
 .../changelog/fs/FsStateChangelogStorage.java      | 15 ++++--
 .../fs/FsStateChangelogStorageFactory.java         |  6 ++-
 .../flink/changelog/fs/StateChangeFsUploader.java  | 14 ++++-
 .../changelog/fs/StateChangeUploadScheduler.java   |  3 ++
 .../changelog/fs/ChangelogStorageMetricsTest.java  |  8 ++-
 .../changelog/fs/FsStateChangelogStorageTest.java  |  2 +
 .../changelog/fs/StateChangeFsUploaderTest.java    | 59 ++++++++++++++++++++++
 .../TaskExecutorStateChangelogStoragesManager.java |  2 +-
 .../changelog/StateChangelogStorageFactory.java    |  4 +-
 .../changelog/StateChangelogStorageLoader.java     |  6 ++-
 .../InMemoryStateChangelogStorageFactory.java      |  3 +-
 ...kExecutorStateChangelogStoragesManagerTest.java |  2 +-
 .../inmemory/StateChangelogStorageLoaderTest.java  | 12 +++--
 .../changelog/ChangelogStateBackendTestUtils.java  |  1 +
 14 files changed, 121 insertions(+), 16 deletions(-)

diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
index df12cad0b97..972135b7845 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
@@ -19,6 +19,7 @@ package org.apache.flink.changelog.fs;
 
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
@@ -61,24 +62,31 @@ public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery
 
     private final TaskChangelogRegistry changelogRegistry;
 
-    public FsStateChangelogStorage(Configuration config, TaskManagerJobMetricGroup metricGroup)
+    public FsStateChangelogStorage(
+            JobID jobID, Configuration config, TaskManagerJobMetricGroup metricGroup)
             throws IOException {
-        this(config, metricGroup, defaultChangelogRegistry(config.get(NUM_DISCARD_THREADS)));
+        this(jobID, config, metricGroup, defaultChangelogRegistry(config.get(NUM_DISCARD_THREADS)));
     }
 
     public FsStateChangelogStorage(
+            JobID jobID,
             Configuration config,
             TaskManagerJobMetricGroup metricGroup,
             TaskChangelogRegistry changelogRegistry)
             throws IOException {
         this(
-                fromConfig(config, new ChangelogStorageMetricGroup(metricGroup), changelogRegistry),
+                fromConfig(
+                        jobID,
+                        config,
+                        new ChangelogStorageMetricGroup(metricGroup),
+                        changelogRegistry),
                 config.get(PREEMPTIVE_PERSIST_THRESHOLD).getBytes(),
                 changelogRegistry);
     }
 
     @VisibleForTesting
     public FsStateChangelogStorage(
+            JobID jobID,
             Path basePath,
             boolean compression,
             int bufferSize,
@@ -88,6 +96,7 @@ public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery
         this(
                 directScheduler(
                         new StateChangeFsUploader(
+                                jobID,
                                 basePath,
                                 basePath.getFileSystem(),
                                 compression,
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
index f76eed1f8ce..561a25de153 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
@@ -18,6 +18,7 @@
 package org.apache.flink.changelog.fs;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
@@ -46,8 +47,9 @@ public class FsStateChangelogStorageFactory implements StateChangelogStorageFact
 
     @Override
     public StateChangelogStorage<?> createStorage(
-            Configuration configuration, TaskManagerJobMetricGroup metricGroup) throws IOException {
-        return new FsStateChangelogStorage(configuration, metricGroup);
+            JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup)
+            throws IOException {
+        return new FsStateChangelogStorage(jobID, configuration, metricGroup);
     }
 
     @Override
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
index 28ecf03c78e..e60c28f6976 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
@@ -18,6 +18,7 @@
 package org.apache.flink.changelog.fs;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
@@ -54,6 +55,8 @@ import static org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE;
 public class StateChangeFsUploader implements StateChangeUploader {
     private static final Logger LOG = LoggerFactory.getLogger(StateChangeFsUploader.class);
 
+    @VisibleForTesting public static final String PATH_SUB_DIR = "dstl";
+
     private final Path basePath;
     private final FileSystem fileSystem;
     private final StateChangeFormat format;
@@ -66,6 +69,7 @@ public class StateChangeFsUploader implements StateChangeUploader {
 
     @VisibleForTesting
     public StateChangeFsUploader(
+            JobID jobID,
             Path basePath,
             FileSystem fileSystem,
             boolean compression,
@@ -73,6 +77,7 @@ public class StateChangeFsUploader implements StateChangeUploader {
             ChangelogStorageMetricGroup metrics,
             TaskChangelogRegistry changelogRegistry) {
         this(
+                jobID,
                 basePath,
                 fileSystem,
                 compression,
@@ -83,6 +88,7 @@ public class StateChangeFsUploader implements StateChangeUploader {
     }
 
     public StateChangeFsUploader(
+            JobID jobID,
             Path basePath,
             FileSystem fileSystem,
             boolean compression,
@@ -90,7 +96,8 @@ public class StateChangeFsUploader implements StateChangeUploader {
             ChangelogStorageMetricGroup metrics,
             TaskChangelogRegistry changelogRegistry,
             BiFunction<Path, Long, StreamStateHandle> handleFactory) {
-        this.basePath = basePath;
+        this.basePath =
+                new Path(basePath, String.format("%s/%s", jobID.toHexString(), PATH_SUB_DIR));
         this.fileSystem = fileSystem;
         this.format = new StateChangeFormat();
         this.compression = compression;
@@ -101,6 +108,11 @@ public class StateChangeFsUploader implements StateChangeUploader {
         this.handleFactory = handleFactory;
     }
 
+    @VisibleForTesting
+    public Path getBasePath() {
+        return this.basePath;
+    }
+
     public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
         final String fileName = generateFileName();
         LOG.debug("upload {} tasks to {}", tasks.size(), fileName);
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
index 8c82b6602ac..6e3b31a8ab3 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
@@ -18,6 +18,7 @@
 package org.apache.flink.changelog.fs;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.io.AvailabilityProvider;
@@ -84,6 +85,7 @@ public interface StateChangeUploadScheduler extends AutoCloseable {
     }
 
     static StateChangeUploadScheduler fromConfig(
+            JobID jobID,
             ReadableConfig config,
             ChangelogStorageMetricGroup metricGroup,
             TaskChangelogRegistry changelogRegistry)
@@ -94,6 +96,7 @@ public interface StateChangeUploadScheduler extends AutoCloseable {
         int bufferSize = (int) bytes;
         StateChangeFsUploader store =
                 new StateChangeFsUploader(
+                        jobID,
                         basePath,
                         basePath.getFileSystem(),
                         config.get(COMPRESSION_ENABLED),
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
index 1002b713829..25acff4f606 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
@@ -64,6 +64,7 @@ public class ChangelogStorageMetricsTest {
 
         try (FsStateChangelogStorage storage =
                 new FsStateChangelogStorage(
+                        JobID.generate(),
                         Path.fromLocalFile(tempFolder.toFile()),
                         false,
                         100,
@@ -88,6 +89,7 @@ public class ChangelogStorageMetricsTest {
 
         try (FsStateChangelogStorage storage =
                 new FsStateChangelogStorage(
+                        JobID.generate(),
                         Path.fromLocalFile(tempFolder.toFile()),
                         false,
                         100,
@@ -120,6 +122,7 @@ public class ChangelogStorageMetricsTest {
                 new ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup());
         try (FsStateChangelogStorage storage =
                 new FsStateChangelogStorage(
+                        JobID.generate(),
                         Path.fromLocalFile(file),
                         false,
                         100,
@@ -150,6 +153,7 @@ public class ChangelogStorageMetricsTest {
         Path basePath = Path.fromLocalFile(tempFolder.toFile());
         StateChangeFsUploader uploader =
                 new StateChangeFsUploader(
+                        JobID.generate(),
                         basePath,
                         basePath.getFileSystem(),
                         false,
@@ -287,6 +291,7 @@ public class ChangelogStorageMetricsTest {
 
     @Test
     void testQueueSize() throws Exception {
+        JobID jobID = JobID.generate();
         AtomicReference<Gauge<Integer>> queueSizeGauge = new AtomicReference<>();
         ChangelogStorageMetricGroup metrics =
                 new ChangelogStorageMetricGroup(
@@ -301,12 +306,13 @@ public class ChangelogStorageMetricsTest {
                                                 })
                                         .build(),
                                 createUnregisteredTaskManagerMetricGroup(),
-                                new JobID(),
+                                jobID,
                                 "test"));
 
         Path path = Path.fromLocalFile(tempFolder.toFile());
         StateChangeFsUploader delegate =
                 new StateChangeFsUploader(
+                        jobID,
                         path,
                         path.getFileSystem(),
                         false,
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
index 906806675a4..01b45762f5c 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.changelog.fs;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.changelog.fs.BatchingStateChangeUploadSchedulerTest.BlockingUploader;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -51,6 +52,7 @@ public class FsStateChangelogStorageTest
     protected StateChangelogStorage<ChangelogStateHandleStreamImpl> getFactory(
             boolean compression, File temporaryFolder) throws IOException {
         return new FsStateChangelogStorage(
+                JobID.generate(),
                 Path.fromLocalFile(temporaryFolder),
                 compression,
                 1024 * 1024 * 10,
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java
new file mode 100644
index 00000000000..d690f951635
--- /dev/null
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** {@link StateChangeFsUploader} test. */
+class StateChangeFsUploaderTest {
+
+    @Test
+    void testBasePath() throws IOException {
+        JobID jobID = JobID.generate();
+        String rootPath = "/dstl-root-path";
+        Path oriBasePath = new Path(rootPath);
+
+        ChangelogStorageMetricGroup metrics =
+                new ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup());
+
+        StateChangeFsUploader uploader =
+                new StateChangeFsUploader(
+                        jobID,
+                        oriBasePath,
+                        oriBasePath.getFileSystem(),
+                        false,
+                        4096,
+                        metrics,
+                        TaskChangelogRegistry.NO_OP);
+
+        assertEquals(
+                uploader.getBasePath().getPath(),
+                String.format(
+                        "%s/%s/%s",
+                        rootPath, jobID.toHexString(), StateChangeFsUploader.PATH_SUB_DIR));
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
index 227b37fedfc..d5cf1711a93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
@@ -90,7 +90,7 @@ public class TaskExecutorStateChangelogStoragesManager {
 
             if (stateChangelogStorage == null) {
                 StateChangelogStorage<?> loaded =
-                        StateChangelogStorageLoader.load(configuration, metricGroup);
+                        StateChangelogStorageLoader.load(jobId, configuration, metricGroup);
                 stateChangelogStorage = Optional.ofNullable(loaded);
                 changelogStoragesByJobId.put(jobId, stateChangelogStorage);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
index 2af108181ca..f60ad065094 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.changelog;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 
@@ -35,7 +36,8 @@ public interface StateChangelogStorageFactory {
 
     /** Create the storage based on a configuration. */
     StateChangelogStorage<?> createStorage(
-            Configuration configuration, TaskManagerJobMetricGroup metricGroup) throws IOException;
+            JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup)
+            throws IOException;
 
     /** Create the storage for recovery. */
     StateChangelogStorageView<?> createStorageView() throws IOException;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
index 28834aef2d1..7947e83e145 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.state.changelog;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.StateChangelogOptions;
 import org.apache.flink.core.plugin.PluginManager;
@@ -86,7 +87,8 @@ public class StateChangelogStorageLoader {
 
     @Nullable
     public static StateChangelogStorage<?> load(
-            Configuration configuration, TaskManagerJobMetricGroup metricGroup) throws IOException {
+            JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup)
+            throws IOException {
         final String identifier =
                 configuration
                         .getString(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE)
@@ -98,7 +100,7 @@ public class StateChangelogStorageLoader {
             return null;
         } else {
             LOG.info("Creating a changelog storage with name '{}'.", identifier);
-            return factory.createStorage(configuration, metricGroup);
+            return factory.createStorage(jobID, configuration, metricGroup);
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
index c55ec12cc07..d09d638d242 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.state.changelog.inmemory;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
@@ -35,7 +36,7 @@ public class InMemoryStateChangelogStorageFactory implements StateChangelogStora
 
     @Override
     public StateChangelogStorage<?> createStorage(
-            Configuration configuration, TaskManagerJobMetricGroup metricGroup) {
+            JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup) {
         return new InMemoryStateChangelogStorage();
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
index 77e9a09ed4c..cf6ef1deb3e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
@@ -207,7 +207,7 @@ public class TaskExecutorStateChangelogStoragesManagerTest {
 
         @Override
         public StateChangelogStorage<?> createStorage(
-                Configuration configuration, TaskManagerJobMetricGroup metricGroup) {
+                JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup) {
             return new TestStateChangelogStorage();
         }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
index 64d00c38d89..988a707242c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.state.changelog.inmemory;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.StateChangelogOptions;
@@ -52,7 +53,9 @@ public class StateChangelogStorageLoaderTest {
         StateChangelogStorageLoader.initialize(getPluginManager(emptyIterator()));
         assertNotNull(
                 StateChangelogStorageLoader.load(
-                        new Configuration(), createUnregisteredTaskManagerJobMetricGroup()));
+                        JobID.generate(),
+                        new Configuration(),
+                        createUnregisteredTaskManagerJobMetricGroup()));
     }
 
     @Test
@@ -60,6 +63,7 @@ public class StateChangelogStorageLoaderTest {
         StateChangelogStorageLoader.initialize(getPluginManager(emptyIterator()));
         assertNull(
                 StateChangelogStorageLoader.load(
+                        JobID.generate(),
                         new Configuration()
                                 .set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, "not_exist"),
                         createUnregisteredTaskManagerJobMetricGroup()));
@@ -73,7 +77,9 @@ public class StateChangelogStorageLoaderTest {
         StateChangelogStorageLoader.initialize(pluginManager);
         StateChangelogStorage loaded =
                 StateChangelogStorageLoader.load(
-                        new Configuration(), createUnregisteredTaskManagerJobMetricGroup());
+                        JobID.generate(),
+                        new Configuration(),
+                        createUnregisteredTaskManagerJobMetricGroup());
         assertTrue(loaded instanceof TestStateChangelogStorage);
     }
 
@@ -114,7 +120,7 @@ public class StateChangelogStorageLoaderTest {
 
         @Override
         public StateChangelogStorage<?> createStorage(
-                Configuration configuration, TaskManagerJobMetricGroup metricGroup) {
+                JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup) {
             return new TestStateChangelogStorage();
         }
 
diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
index e92b025b08e..6a3a41bccd1 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
@@ -183,6 +183,7 @@ public class ChangelogStateBackendTestUtils {
         return TestTaskStateManager.builder()
                 .setStateChangelogStorage(
                         new FsStateChangelogStorage(
+                                JobID.generate(),
                                 Path.fromLocalFile(changelogStoragePath),
                                 false,
                                 1024,