You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/07/08 15:35:33 UTC
[flink] branch master updated: [FLINK-28172][changelog] Scatter dstl files into separate directories by job id
This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 135130379e5 [FLINK-28172][changelog] Scatter dstl files into separate directories by job id
135130379e5 is described below
commit 135130379e592d9ae752bc4eea20b6fb222f8c15
Author: wangfeifan <zo...@163.com>
AuthorDate: Tue Jun 28 16:33:11 2022 +0800
[FLINK-28172][changelog] Scatter dstl files into separate directories by job id
---
.../changelog/fs/FsStateChangelogStorage.java | 15 ++++--
.../fs/FsStateChangelogStorageFactory.java | 6 ++-
.../flink/changelog/fs/StateChangeFsUploader.java | 14 ++++-
.../changelog/fs/StateChangeUploadScheduler.java | 3 ++
.../changelog/fs/ChangelogStorageMetricsTest.java | 8 ++-
.../changelog/fs/FsStateChangelogStorageTest.java | 2 +
.../changelog/fs/StateChangeFsUploaderTest.java | 59 ++++++++++++++++++++++
.../TaskExecutorStateChangelogStoragesManager.java | 2 +-
.../changelog/StateChangelogStorageFactory.java | 4 +-
.../changelog/StateChangelogStorageLoader.java | 6 ++-
.../InMemoryStateChangelogStorageFactory.java | 3 +-
...kExecutorStateChangelogStoragesManagerTest.java | 2 +-
.../inmemory/StateChangelogStorageLoaderTest.java | 12 +++--
.../changelog/ChangelogStateBackendTestUtils.java | 1 +
14 files changed, 121 insertions(+), 16 deletions(-)
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
index df12cad0b97..972135b7845 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
@@ -19,6 +19,7 @@ package org.apache.flink.changelog.fs;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
@@ -61,24 +62,31 @@ public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery
private final TaskChangelogRegistry changelogRegistry;
- public FsStateChangelogStorage(Configuration config, TaskManagerJobMetricGroup metricGroup)
+ public FsStateChangelogStorage(
+ JobID jobID, Configuration config, TaskManagerJobMetricGroup metricGroup)
throws IOException {
- this(config, metricGroup, defaultChangelogRegistry(config.get(NUM_DISCARD_THREADS)));
+ this(jobID, config, metricGroup, defaultChangelogRegistry(config.get(NUM_DISCARD_THREADS)));
}
public FsStateChangelogStorage(
+ JobID jobID,
Configuration config,
TaskManagerJobMetricGroup metricGroup,
TaskChangelogRegistry changelogRegistry)
throws IOException {
this(
- fromConfig(config, new ChangelogStorageMetricGroup(metricGroup), changelogRegistry),
+ fromConfig(
+ jobID,
+ config,
+ new ChangelogStorageMetricGroup(metricGroup),
+ changelogRegistry),
config.get(PREEMPTIVE_PERSIST_THRESHOLD).getBytes(),
changelogRegistry);
}
@VisibleForTesting
public FsStateChangelogStorage(
+ JobID jobID,
Path basePath,
boolean compression,
int bufferSize,
@@ -88,6 +96,7 @@ public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery
this(
directScheduler(
new StateChangeFsUploader(
+ jobID,
basePath,
basePath.getFileSystem(),
compression,
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
index f76eed1f8ce..561a25de153 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
@@ -18,6 +18,7 @@
package org.apache.flink.changelog.fs;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
@@ -46,8 +47,9 @@ public class FsStateChangelogStorageFactory implements StateChangelogStorageFact
@Override
public StateChangelogStorage<?> createStorage(
- Configuration configuration, TaskManagerJobMetricGroup metricGroup) throws IOException {
- return new FsStateChangelogStorage(configuration, metricGroup);
+ JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup)
+ throws IOException {
+ return new FsStateChangelogStorage(jobID, configuration, metricGroup);
}
@Override
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
index 28ecf03c78e..e60c28f6976 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
@@ -18,6 +18,7 @@
package org.apache.flink.changelog.fs;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
@@ -54,6 +55,8 @@ import static org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE;
public class StateChangeFsUploader implements StateChangeUploader {
private static final Logger LOG = LoggerFactory.getLogger(StateChangeFsUploader.class);
+ @VisibleForTesting public static final String PATH_SUB_DIR = "dstl";
+
private final Path basePath;
private final FileSystem fileSystem;
private final StateChangeFormat format;
@@ -66,6 +69,7 @@ public class StateChangeFsUploader implements StateChangeUploader {
@VisibleForTesting
public StateChangeFsUploader(
+ JobID jobID,
Path basePath,
FileSystem fileSystem,
boolean compression,
@@ -73,6 +77,7 @@ public class StateChangeFsUploader implements StateChangeUploader {
ChangelogStorageMetricGroup metrics,
TaskChangelogRegistry changelogRegistry) {
this(
+ jobID,
basePath,
fileSystem,
compression,
@@ -83,6 +88,7 @@ public class StateChangeFsUploader implements StateChangeUploader {
}
public StateChangeFsUploader(
+ JobID jobID,
Path basePath,
FileSystem fileSystem,
boolean compression,
@@ -90,7 +96,8 @@ public class StateChangeFsUploader implements StateChangeUploader {
ChangelogStorageMetricGroup metrics,
TaskChangelogRegistry changelogRegistry,
BiFunction<Path, Long, StreamStateHandle> handleFactory) {
- this.basePath = basePath;
+ this.basePath =
+ new Path(basePath, String.format("%s/%s", jobID.toHexString(), PATH_SUB_DIR));
this.fileSystem = fileSystem;
this.format = new StateChangeFormat();
this.compression = compression;
@@ -101,6 +108,11 @@ public class StateChangeFsUploader implements StateChangeUploader {
this.handleFactory = handleFactory;
}
+ @VisibleForTesting
+ public Path getBasePath() {
+ return this.basePath;
+ }
+
public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
final String fileName = generateFileName();
LOG.debug("upload {} tasks to {}", tasks.size(), fileName);
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
index 8c82b6602ac..6e3b31a8ab3 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
@@ -18,6 +18,7 @@
package org.apache.flink.changelog.fs;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.io.AvailabilityProvider;
@@ -84,6 +85,7 @@ public interface StateChangeUploadScheduler extends AutoCloseable {
}
static StateChangeUploadScheduler fromConfig(
+ JobID jobID,
ReadableConfig config,
ChangelogStorageMetricGroup metricGroup,
TaskChangelogRegistry changelogRegistry)
@@ -94,6 +96,7 @@ public interface StateChangeUploadScheduler extends AutoCloseable {
int bufferSize = (int) bytes;
StateChangeFsUploader store =
new StateChangeFsUploader(
+ jobID,
basePath,
basePath.getFileSystem(),
config.get(COMPRESSION_ENABLED),
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
index 1002b713829..25acff4f606 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
@@ -64,6 +64,7 @@ public class ChangelogStorageMetricsTest {
try (FsStateChangelogStorage storage =
new FsStateChangelogStorage(
+ JobID.generate(),
Path.fromLocalFile(tempFolder.toFile()),
false,
100,
@@ -88,6 +89,7 @@ public class ChangelogStorageMetricsTest {
try (FsStateChangelogStorage storage =
new FsStateChangelogStorage(
+ JobID.generate(),
Path.fromLocalFile(tempFolder.toFile()),
false,
100,
@@ -120,6 +122,7 @@ public class ChangelogStorageMetricsTest {
new ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup());
try (FsStateChangelogStorage storage =
new FsStateChangelogStorage(
+ JobID.generate(),
Path.fromLocalFile(file),
false,
100,
@@ -150,6 +153,7 @@ public class ChangelogStorageMetricsTest {
Path basePath = Path.fromLocalFile(tempFolder.toFile());
StateChangeFsUploader uploader =
new StateChangeFsUploader(
+ JobID.generate(),
basePath,
basePath.getFileSystem(),
false,
@@ -287,6 +291,7 @@ public class ChangelogStorageMetricsTest {
@Test
void testQueueSize() throws Exception {
+ JobID jobID = JobID.generate();
AtomicReference<Gauge<Integer>> queueSizeGauge = new AtomicReference<>();
ChangelogStorageMetricGroup metrics =
new ChangelogStorageMetricGroup(
@@ -301,12 +306,13 @@ public class ChangelogStorageMetricsTest {
})
.build(),
createUnregisteredTaskManagerMetricGroup(),
- new JobID(),
+ jobID,
"test"));
Path path = Path.fromLocalFile(tempFolder.toFile());
StateChangeFsUploader delegate =
new StateChangeFsUploader(
+ jobID,
path,
path.getFileSystem(),
false,
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
index 906806675a4..01b45762f5c 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.changelog.fs;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.changelog.fs.BatchingStateChangeUploadSchedulerTest.BlockingUploader;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.OperatorID;
@@ -51,6 +52,7 @@ public class FsStateChangelogStorageTest
protected StateChangelogStorage<ChangelogStateHandleStreamImpl> getFactory(
boolean compression, File temporaryFolder) throws IOException {
return new FsStateChangelogStorage(
+ JobID.generate(),
Path.fromLocalFile(temporaryFolder),
compression,
1024 * 1024 * 10,
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java
new file mode 100644
index 00000000000..d690f951635
--- /dev/null
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** {@link StateChangeFsUploader} test. */
+class StateChangeFsUploaderTest {
+
+ @Test
+ void testBasePath() throws IOException {
+ JobID jobID = JobID.generate();
+ String rootPath = "/dstl-root-path";
+ Path oriBasePath = new Path(rootPath);
+
+ ChangelogStorageMetricGroup metrics =
+ new ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup());
+
+ StateChangeFsUploader uploader =
+ new StateChangeFsUploader(
+ jobID,
+ oriBasePath,
+ oriBasePath.getFileSystem(),
+ false,
+ 4096,
+ metrics,
+ TaskChangelogRegistry.NO_OP);
+
+ assertEquals(
+ uploader.getBasePath().getPath(),
+ String.format(
+ "%s/%s/%s",
+ rootPath, jobID.toHexString(), StateChangeFsUploader.PATH_SUB_DIR));
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
index 227b37fedfc..d5cf1711a93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
@@ -90,7 +90,7 @@ public class TaskExecutorStateChangelogStoragesManager {
if (stateChangelogStorage == null) {
StateChangelogStorage<?> loaded =
- StateChangelogStorageLoader.load(configuration, metricGroup);
+ StateChangelogStorageLoader.load(jobId, configuration, metricGroup);
stateChangelogStorage = Optional.ofNullable(loaded);
changelogStoragesByJobId.put(jobId, stateChangelogStorage);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
index 2af108181ca..f60ad065094 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state.changelog;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
@@ -35,7 +36,8 @@ public interface StateChangelogStorageFactory {
/** Create the storage based on a configuration. */
StateChangelogStorage<?> createStorage(
- Configuration configuration, TaskManagerJobMetricGroup metricGroup) throws IOException;
+ JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup)
+ throws IOException;
/** Create the storage for recovery. */
StateChangelogStorageView<?> createStorageView() throws IOException;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
index 28834aef2d1..7947e83e145 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.state.changelog;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.core.plugin.PluginManager;
@@ -86,7 +87,8 @@ public class StateChangelogStorageLoader {
@Nullable
public static StateChangelogStorage<?> load(
- Configuration configuration, TaskManagerJobMetricGroup metricGroup) throws IOException {
+ JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup)
+ throws IOException {
final String identifier =
configuration
.getString(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE)
@@ -98,7 +100,7 @@ public class StateChangelogStorageLoader {
return null;
} else {
LOG.info("Creating a changelog storage with name '{}'.", identifier);
- return factory.createStorage(configuration, metricGroup);
+ return factory.createStorage(jobID, configuration, metricGroup);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
index c55ec12cc07..d09d638d242 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
@@ -17,6 +17,7 @@
package org.apache.flink.runtime.state.changelog.inmemory;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
@@ -35,7 +36,7 @@ public class InMemoryStateChangelogStorageFactory implements StateChangelogStora
@Override
public StateChangelogStorage<?> createStorage(
- Configuration configuration, TaskManagerJobMetricGroup metricGroup) {
+ JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup) {
return new InMemoryStateChangelogStorage();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
index 77e9a09ed4c..cf6ef1deb3e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
@@ -207,7 +207,7 @@ public class TaskExecutorStateChangelogStoragesManagerTest {
@Override
public StateChangelogStorage<?> createStorage(
- Configuration configuration, TaskManagerJobMetricGroup metricGroup) {
+ JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup) {
return new TestStateChangelogStorage();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
index 64d00c38d89..988a707242c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.runtime.state.changelog.inmemory;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateChangelogOptions;
@@ -52,7 +53,9 @@ public class StateChangelogStorageLoaderTest {
StateChangelogStorageLoader.initialize(getPluginManager(emptyIterator()));
assertNotNull(
StateChangelogStorageLoader.load(
- new Configuration(), createUnregisteredTaskManagerJobMetricGroup()));
+ JobID.generate(),
+ new Configuration(),
+ createUnregisteredTaskManagerJobMetricGroup()));
}
@Test
@@ -60,6 +63,7 @@ public class StateChangelogStorageLoaderTest {
StateChangelogStorageLoader.initialize(getPluginManager(emptyIterator()));
assertNull(
StateChangelogStorageLoader.load(
+ JobID.generate(),
new Configuration()
.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, "not_exist"),
createUnregisteredTaskManagerJobMetricGroup()));
@@ -73,7 +77,9 @@ public class StateChangelogStorageLoaderTest {
StateChangelogStorageLoader.initialize(pluginManager);
StateChangelogStorage loaded =
StateChangelogStorageLoader.load(
- new Configuration(), createUnregisteredTaskManagerJobMetricGroup());
+ JobID.generate(),
+ new Configuration(),
+ createUnregisteredTaskManagerJobMetricGroup());
assertTrue(loaded instanceof TestStateChangelogStorage);
}
@@ -114,7 +120,7 @@ public class StateChangelogStorageLoaderTest {
@Override
public StateChangelogStorage<?> createStorage(
- Configuration configuration, TaskManagerJobMetricGroup metricGroup) {
+ JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup) {
return new TestStateChangelogStorage();
}
diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
index e92b025b08e..6a3a41bccd1 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
@@ -183,6 +183,7 @@ public class ChangelogStateBackendTestUtils {
return TestTaskStateManager.builder()
.setStateChangelogStorage(
new FsStateChangelogStorage(
+ JobID.generate(),
Path.fromLocalFile(changelogStoragePath),
false,
1024,