You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/03/22 07:04:50 UTC

[flink] branch master updated: [FLINK-25549][state/changelog] Migrate flink-dstl to use JUnit5

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 480956f  [FLINK-25549][state/changelog] Migrate flink-dstl to use JUnit5
480956f is described below

commit 480956f3590426ead0578781d5ef36fc3a56b8df
Author: Roc Marshal <fl...@126.com>
AuthorDate: Fri Mar 4 08:24:53 2022 +0800

    [FLINK-25549][state/changelog] Migrate flink-dstl to use JUnit5
    
    This closes #18978.
---
 .../fs/BatchingStateChangeUploadSchedulerTest.java | 105 +++++++++---------
 .../changelog/fs/ChangelogStorageMetricsTest.java  |  59 ++++++-----
 .../changelog/fs/FsStateChangelogStorageTest.java  |  22 ++--
 .../fs/FsStateChangelogWriterSqnTest.java          |  61 +++++------
 .../changelog/fs/FsStateChangelogWriterTest.java   | 117 +++++++++++----------
 .../flink/changelog/fs/RetryingExecutorTest.java   |  44 ++++----
 .../changelog/fs/TestingStateChangeUploader.java   |   1 +
 .../inmemory/StateChangelogStorageTest.java        |  63 ++++++-----
 8 files changed, 243 insertions(+), 229 deletions(-)

diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
index 2b13d91..621d46b 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.function.BiConsumerWithException;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -56,19 +56,18 @@ import static java.util.stream.Collectors.toMap;
 import static org.apache.flink.changelog.fs.UnregisteredChangelogStorageMetricGroup.createUnregisteredChangelogStorageMetricGroup;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.rethrow;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 /** {@link BatchingStateChangeUploadScheduler} test. */
-public class BatchingStateChangeUploadSchedulerTest {
+class BatchingStateChangeUploadSchedulerTest {
 
     private static final int MAX_BYTES_IN_FLIGHT = 10_000;
     private final Random random = new Random();
 
     @Test
-    public void testNoDelayAndThreshold() throws Exception {
+    void testNoDelayAndThreshold() throws Exception {
         withStore(
                 0,
                 0,
@@ -89,7 +88,7 @@ public class BatchingStateChangeUploadSchedulerTest {
     }
 
     @Test
-    public void testSizeThreshold() throws Exception {
+    void testSizeThreshold() throws Exception {
         int numChanges = 7;
         int changeSize = 11;
         int threshold = changeSize * numChanges;
@@ -108,14 +107,14 @@ public class BatchingStateChangeUploadSchedulerTest {
                         if (runningSize >= threshold) {
                             assertSaved(probe, expected);
                         } else {
-                            assertTrue(probe.getUploaded().isEmpty());
+                            assertThat(probe.getUploaded()).isEmpty();
                         }
                     }
                 });
     }
 
     @Test
-    public void testDelay() throws Exception {
+    void testDelay() throws Exception {
         int delayMs = 50;
         ManuallyTriggeredScheduledExecutorService scheduler =
                 new ManuallyTriggeredScheduledExecutorService();
@@ -128,20 +127,17 @@ public class BatchingStateChangeUploadSchedulerTest {
                     scheduler.triggerAll();
                     List<StateChangeSet> changeSets = getChanges(4);
                     upload(store, changeSets);
-                    assertTrue(probe.getUploaded().isEmpty());
-                    assertTrue(
-                            scheduler.getAllNonPeriodicScheduledTask().stream()
-                                    .anyMatch(
-                                            scheduled ->
-                                                    scheduled.getDelay(MILLISECONDS) == delayMs));
+                    assertThat(probe.getUploaded()).isEmpty();
+                    assertThat(scheduler.getAllNonPeriodicScheduledTask())
+                            .anyMatch(scheduled -> scheduled.getDelay(MILLISECONDS) == delayMs);
                     scheduler.triggerAllNonPeriodicTasks();
-                    assertEquals(changeSets, probe.getUploaded());
+                    assertThat(probe.getUploaded()).isEqualTo(changeSets);
                 });
     }
 
     /** Test integration with {@link RetryingExecutor}. */
     @Test
-    public void testRetry() throws Exception {
+    void testRetry() throws Exception {
         final int maxAttempts = 5;
 
         try (BatchingStateChangeUploadScheduler store =
@@ -184,7 +180,7 @@ public class BatchingStateChangeUploadSchedulerTest {
     }
 
     @Test
-    public void testUploadTimeout() throws Exception {
+    void testUploadTimeout() throws Exception {
         AtomicReference<List<SequenceNumber>> failed = new AtomicReference<>();
         UploadTask upload =
                 new UploadTask(getChanges(4), unused -> {}, (sqn, error) -> failed.set(sqn));
@@ -201,12 +197,12 @@ public class BatchingStateChangeUploadSchedulerTest {
             }
         }
 
-        assertTrue(upload.finished.get());
-        assertEquals(
-                upload.changeSets.stream()
-                        .map(StateChangeSet::getSequenceNumber)
-                        .collect(Collectors.toSet()),
-                new HashSet<>(failed.get()));
+        assertThat(upload.finished.get()).isTrue();
+        assertThat(
+                        upload.changeSets.stream()
+                                .map(StateChangeSet::getSequenceNumber)
+                                .collect(Collectors.toSet()))
+                .isEqualTo(new HashSet<>(failed.get()));
     }
 
     @Test
@@ -236,19 +232,20 @@ public class BatchingStateChangeUploadSchedulerTest {
             }
         }
 
-        assertTrue(upload.finished.get());
-        assertEquals(
-                upload.changeSets.stream()
-                        .map(StateChangeSet::getSequenceNumber)
-                        .collect(Collectors.toSet()),
-                succeeded.get().stream()
-                        .map(UploadResult::getSequenceNumber)
-                        .collect(Collectors.toSet()));
-        assertTrue(failed.get().isEmpty());
+        assertThat(upload.finished.get()).isTrue();
+        assertThat(
+                        upload.changeSets.stream()
+                                .map(StateChangeSet::getSequenceNumber)
+                                .collect(Collectors.toSet()))
+                .isEqualTo(
+                        succeeded.get().stream()
+                                .map(UploadResult::getSequenceNumber)
+                                .collect(Collectors.toSet()));
+        assertThat(failed.get()).isEmpty();
     }
 
-    @Test(expected = RejectedExecutionException.class)
-    public void testErrorHandling() throws Exception {
+    @Test
+    void testErrorHandling() throws Exception {
         TestingStateChangeUploader probe = new TestingStateChangeUploader();
         DirectScheduledExecutorService scheduler = new DirectScheduledExecutorService();
         try (BatchingStateChangeUploadScheduler store =
@@ -265,12 +262,13 @@ public class BatchingStateChangeUploadSchedulerTest {
                                         .getAttemptsPerUpload()),
                         createUnregisteredChangelogStorageMetricGroup())) {
             scheduler.shutdown();
-            upload(store, getChanges(4));
+            assertThatThrownBy(() -> upload(store, getChanges(4)))
+                    .isInstanceOf(RejectedExecutionException.class);
         }
     }
 
     @Test
-    public void testClose() throws Exception {
+    void testClose() throws Exception {
         TestingStateChangeUploader probe = new TestingStateChangeUploader();
         DirectScheduledExecutorService scheduler = new DirectScheduledExecutorService();
         DirectScheduledExecutorService retryScheduler = new DirectScheduledExecutorService();
@@ -287,13 +285,13 @@ public class BatchingStateChangeUploadSchedulerTest {
                                         .getAttemptsPerUpload()),
                         createUnregisteredChangelogStorageMetricGroup())
                 .close();
-        assertTrue(probe.isClosed());
-        assertTrue(scheduler.isShutdown());
-        assertTrue(retryScheduler.isShutdown());
+        assertThat(probe.isClosed()).isTrue();
+        assertThat(scheduler.isShutdown()).isTrue();
+        assertThat(retryScheduler.isShutdown()).isTrue();
     }
 
     @Test
-    public void testBackPressure() throws Exception {
+    void testBackPressure() throws Exception {
         int sizeLimit = MAX_BYTES_IN_FLIGHT;
         CompletableFuture<TestingStateChangeUploader> thresholdExceededFuture =
                 new CompletableFuture<>();
@@ -306,13 +304,14 @@ public class BatchingStateChangeUploadSchedulerTest {
                                             getChanges(sizeLimit / 2).stream(),
                                             getChanges(sizeLimit / 2).stream())
                                     .collect(Collectors.toList());
-                    assertTrue(uploader.getAvailabilityProvider().isAvailable());
-                    assertTrue(uploader.getAvailabilityProvider().isApproximatelyAvailable());
+                    assertThat(uploader.getAvailabilityProvider().isAvailable()).isTrue();
+                    assertThat(uploader.getAvailabilityProvider().isApproximatelyAvailable())
+                            .isTrue();
                     upload(uploader, changes1);
                     assertSaved(probe, changes1); // sent to upload, not finished yet
                     thresholdExceededFuture.complete(probe);
                     List<StateChangeSet> changes2 = getChanges(1);
-                    assertFalse(uploader.getAvailabilityProvider().isAvailable());
+                    assertThat(uploader.getAvailabilityProvider().isAvailable()).isFalse();
                     upload(uploader, changes2); // should block until capacity released
                     assertSaved(probe, changes1, changes2);
                 };
@@ -322,14 +321,14 @@ public class BatchingStateChangeUploadSchedulerTest {
         TestingStateChangeUploader probe = thresholdExceededFuture.get();
         int uploadedInTheBeginning = probe.getUploaded().size();
         Thread.sleep(500); // allow failing, i.e. to proceed with upload
-        assertEquals(uploadedInTheBeginning, probe.getUploaded().size());
+        assertThat(probe.getUploaded().size()).isEqualTo(uploadedInTheBeginning);
         probe.completeUpload(); // release capacity
         uploadFuture.join();
-        assertTrue(uploadedInTheBeginning < probe.getUploaded().size());
+        assertThat(probe.getUploaded().size()).isGreaterThan(uploadedInTheBeginning);
     }
 
     @Test
-    public void testInterruptedWhenBackPressured() throws Exception {
+    void testInterruptedWhenBackPressured() throws Exception {
         int limit = MAX_BYTES_IN_FLIGHT;
         TestScenario test =
                 (uploader, probe) -> {
@@ -342,7 +341,7 @@ public class BatchingStateChangeUploadSchedulerTest {
                         fail("upload shouldn't succeed after exceeding the limit");
                     } catch (IOException e) {
                         if (findThrowable(e, InterruptedException.class).isPresent()) {
-                            assertTrue(probe.getUploaded().isEmpty());
+                            assertThat(probe.getUploaded()).isEmpty();
                         } else {
                             rethrow(e);
                         }
@@ -404,9 +403,11 @@ public class BatchingStateChangeUploadSchedulerTest {
     @SafeVarargs
     private final void assertSaved(
             TestingStateChangeUploader probe, List<StateChangeSet>... expected) {
-        assertEquals(
-                Arrays.stream(expected).flatMap(Collection::stream).collect(Collectors.toList()),
-                new ArrayList<>(probe.getUploaded()));
+        assertThat(new ArrayList<>(probe.getUploaded()))
+                .isEqualTo(
+                        Arrays.stream(expected)
+                                .flatMap(Collection::stream)
+                                .collect(Collectors.toList()));
     }
 
     private interface TestScenario
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
index 3444139..24280c8 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
@@ -29,15 +29,16 @@ import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -47,42 +48,41 @@ import static org.apache.flink.changelog.fs.ChangelogStorageMetricGroup.CHANGELO
 import static org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup;
 import static org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup;
 import static org.apache.flink.runtime.state.KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** {@link ChangelogStorageMetricGroup} test. */
 public class ChangelogStorageMetricsTest {
-    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @TempDir java.nio.file.Path tempFolder;
 
     @Test
-    public void testUploadsCounter() throws Exception {
+    void testUploadsCounter() throws Exception {
         ChangelogStorageMetricGroup metrics =
                 new ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup());
 
         try (FsStateChangelogStorage storage =
                 new FsStateChangelogStorage(
-                        Path.fromLocalFile(temporaryFolder.newFolder()), false, 100, metrics)) {
+                        Path.fromLocalFile(tempFolder.toFile()), false, 100, metrics)) {
             FsStateChangelogWriter writer = createWriter(storage);
-
             int numUploads = 5;
             for (int i = 0; i < numUploads; i++) {
                 SequenceNumber from = writer.nextSequenceNumber();
                 writer.append(0, new byte[] {0, 1, 2, 3});
                 writer.persist(from).get();
             }
-            assertEquals(numUploads, metrics.getUploadsCounter().getCount());
-            assertTrue(metrics.getUploadLatenciesNanos().getStatistics().getMin() > 0);
+            assertThat(metrics.getUploadsCounter().getCount()).isEqualTo(numUploads);
+            assertThat(metrics.getUploadLatenciesNanos().getStatistics().getMin()).isGreaterThan(0);
         }
     }
 
     @Test
-    public void testUploadSizes() throws Exception {
+    void testUploadSizes() throws Exception {
         ChangelogStorageMetricGroup metrics =
                 new ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup());
 
         try (FsStateChangelogStorage storage =
                 new FsStateChangelogStorage(
-                        Path.fromLocalFile(temporaryFolder.newFolder()), false, 100, metrics)) {
+                        Path.fromLocalFile(tempFolder.toFile()), false, 100, metrics)) {
             FsStateChangelogWriter writer = createWriter(storage);
 
             // upload single byte to infer header size
@@ -98,13 +98,14 @@ public class ChangelogStorageMetricsTest {
                 writer.persist(from).get();
             }
             long expected = upload.length + headerSize;
-            assertEquals(expected, metrics.getUploadSizes().getStatistics().getMax());
+            assertThat(metrics.getUploadSizes().getStatistics().getMax()).isEqualTo(expected);
         }
     }
 
     @Test
-    public void testUploadFailuresCounter() throws Exception {
-        File file = temporaryFolder.newFile(); // using file instead of folder will cause a failure
+    void testUploadFailuresCounter() throws Exception {
+        // using file instead of folder will cause a failure
+        File file = Files.createTempFile(tempFolder, UUID.randomUUID().toString(), "").toFile();
         ChangelogStorageMetricGroup metrics =
                 new ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup());
         try (FsStateChangelogStorage storage =
@@ -121,17 +122,17 @@ public class ChangelogStorageMetricsTest {
                     // ignore
                 }
             }
-            assertEquals(numUploads, metrics.getUploadFailuresCounter().getCount());
+            assertThat(metrics.getUploadFailuresCounter().getCount()).isEqualTo(numUploads);
         }
     }
 
     @Test
-    public void testUploadBatchSizes() throws Exception {
+    void testUploadBatchSizes() throws Exception {
         int numWriters = 5, numUploads = 5;
 
         ChangelogStorageMetricGroup metrics =
                 new ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup());
-        Path basePath = Path.fromLocalFile(temporaryFolder.newFolder());
+        Path basePath = Path.fromLocalFile(tempFolder.toFile());
         StateChangeFsUploader uploader =
                 new StateChangeFsUploader(basePath, basePath.getFileSystem(), false, 100, metrics);
         ManuallyTriggeredScheduledExecutorService scheduler =
@@ -167,15 +168,17 @@ public class ChangelogStorageMetricsTest {
                 // now the uploads should be grouped and executed at once
                 scheduler.triggerScheduledTasks();
             }
-            assertEquals(numWriters, metrics.getUploadBatchSizes().getStatistics().getMin());
-            assertEquals(numWriters, metrics.getUploadBatchSizes().getStatistics().getMax());
+            assertThat(metrics.getUploadBatchSizes().getStatistics().getMin())
+                    .isEqualTo(numWriters);
+            assertThat(metrics.getUploadBatchSizes().getStatistics().getMax())
+                    .isEqualTo(numWriters);
         } finally {
             storage.close();
         }
     }
 
     @Test
-    public void testAttemptsPerUpload() throws Exception {
+    void testAttemptsPerUpload() throws Exception {
         int numUploads = 7, maxAttempts = 3;
 
         ChangelogStorageMetricGroup metrics =
@@ -202,15 +205,15 @@ public class ChangelogStorageMetricsTest {
                 writer.persist(from).get();
             }
             HistogramStatistics histogram = metrics.getAttemptsPerUpload().getStatistics();
-            assertEquals(maxAttempts, histogram.getMin());
-            assertEquals(maxAttempts, histogram.getMax());
+            assertThat(histogram.getMin()).isEqualTo(maxAttempts);
+            assertThat(histogram.getMax()).isEqualTo(maxAttempts);
         } finally {
             storage.close();
         }
     }
 
     @Test
-    public void testQueueSize() throws Exception {
+    void testQueueSize() throws Exception {
         AtomicReference<Gauge<Integer>> queueSizeGauge = new AtomicReference<>();
         ChangelogStorageMetricGroup metrics =
                 new ChangelogStorageMetricGroup(
@@ -228,7 +231,7 @@ public class ChangelogStorageMetricsTest {
                                 new JobID(),
                                 "test"));
 
-        Path path = Path.fromLocalFile(temporaryFolder.newFolder());
+        Path path = Path.fromLocalFile(tempFolder.toFile());
         StateChangeFsUploader delegate =
                 new StateChangeFsUploader(path, path.getFileSystem(), false, 100, metrics);
         ManuallyTriggeredScheduledExecutorService scheduler =
@@ -252,9 +255,9 @@ public class ChangelogStorageMetricsTest {
                 writer.append(0, new byte[] {0});
                 writer.persist(from);
             }
-            assertEquals(numUploads, (int) queueSizeGauge.get().getValue());
+            assertThat((int) queueSizeGauge.get().getValue()).isEqualTo(numUploads);
             scheduler.triggerScheduledTasks();
-            assertEquals(0, (int) queueSizeGauge.get().getValue());
+            assertThat((int) queueSizeGauge.get().getValue()).isEqualTo(0);
         }
     }
 
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
index 602155d..e23945c 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.changelog.fs.BatchingStateChangeUploadSchedulerTest.Bloc
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import org.apache.flink.runtime.state.changelog.inmemory.StateChangelogStorageTest;
@@ -28,30 +29,29 @@ import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
 import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
 
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.Test;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.stream.Stream;
 
 import static org.apache.flink.changelog.fs.UnregisteredChangelogStorageMetricGroup.createUnregisteredChangelogStorageMetricGroup;
 
 /** {@link FsStateChangelogStorage} test. */
-@RunWith(Parameterized.class)
-public class FsStateChangelogStorageTest extends StateChangelogStorageTest {
-    @Parameterized.Parameter public boolean compression;
+public class FsStateChangelogStorageTest
+        extends StateChangelogStorageTest<ChangelogStateHandleStreamImpl> {
 
-    @Parameterized.Parameters(name = "use compression = {0}")
-    public static Object[] parameters() {
-        return new Object[] {true, false};
+    public static Stream<Boolean> parameters() {
+        return Stream.of(true, false);
     }
 
     @Override
-    protected StateChangelogStorage<?> getFactory() throws IOException {
+    protected StateChangelogStorage<ChangelogStateHandleStreamImpl> getFactory(
+            boolean compression, File temporaryFolder) throws IOException {
         return new FsStateChangelogStorage(
-                Path.fromLocalFile(temporaryFolder.newFolder()),
+                Path.fromLocalFile(temporaryFolder),
                 compression,
                 1024 * 1024 * 10,
                 createUnregisteredChangelogStorageMetricGroup());
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
index f3e576d..b005572 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
@@ -23,27 +23,23 @@ import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import org.apache.flink.util.function.ThrowingConsumer;
 
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.UUID;
+import java.util.stream.Stream;
 
-import static java.util.Arrays.asList;
 import static org.apache.flink.changelog.fs.FsStateChangelogWriterSqnTest.WriterSqnTestSettings.of;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Test of incrementing {@link SequenceNumber sequence numbers} by {@link FsStateChangelogWriter}.
  */
-@RunWith(Parameterized.class)
 public class FsStateChangelogWriterSqnTest {
 
-    @Parameterized.Parameters(name = "{0}")
-    public static List<WriterSqnTestSettings> getSettings() {
-        return asList(
+    private static Stream<WriterSqnTestSettings> getSettings() {
+        return Stream.of(
                 of(StateChangelogWriter::nextSequenceNumber, "nextSequenceNumber")
                         .withAppendCall(false)
                         .expectIncrement(false),
@@ -73,14 +69,9 @@ public class FsStateChangelogWriterSqnTest {
                         .expectIncrement(true));
     }
 
-    private final WriterSqnTestSettings test;
-
-    public FsStateChangelogWriterSqnTest(WriterSqnTestSettings test) {
-        this.test = test;
-    }
-
-    @Test
-    public void runTest() throws IOException {
+    @MethodSource("getSettings")
+    @ParameterizedTest(name = "writerSqnTestSettings = {0}")
+    void runTest(WriterSqnTestSettings writerSqnTestSettings) throws IOException {
         try (FsStateChangelogWriter writer =
                 new FsStateChangelogWriter(
                         UUID.randomUUID(),
@@ -89,28 +80,19 @@ public class FsStateChangelogWriterSqnTest {
                                 new TestingStateChangeUploader()),
                         Long.MAX_VALUE,
                         new SyncMailboxExecutor())) {
-            if (test.withAppend) {
+            if (writerSqnTestSettings.withAppend) {
                 append(writer);
             }
-            test.action.accept(writer);
-            assertEquals(
-                    getMessage(),
-                    test.expectIncrement
-                            ? writer.initialSequenceNumber().next()
-                            : writer.initialSequenceNumber(),
-                    writer.lastAppendedSqnUnsafe());
+            writerSqnTestSettings.action.accept(writer);
+            assertThat(writer.lastAppendedSqnUnsafe())
+                    .as(writerSqnTestSettings.getMessage())
+                    .isEqualTo(
+                            writerSqnTestSettings.expectIncrement
+                                    ? writer.initialSequenceNumber().next()
+                                    : writer.initialSequenceNumber());
         }
     }
 
-    private String getMessage() {
-        return test.name
-                + " should"
-                + (test.expectIncrement ? " " : " NOT ")
-                + "increment SQN"
-                + (test.expectIncrement ? " after " : " without ")
-                + "appends";
-    }
-
     static class WriterSqnTestSettings {
         private final String name;
         private final ThrowingConsumer<FsStateChangelogWriter, IOException> action;
@@ -123,6 +105,15 @@ public class FsStateChangelogWriterSqnTest {
             this.action = action;
         }
 
+        public String getMessage() {
+            return this.name
+                    + " should"
+                    + (this.expectIncrement ? " " : " NOT ")
+                    + "increment SQN"
+                    + (this.expectIncrement ? " after " : " without ")
+                    + "appends";
+        }
+
         public static WriterSqnTestSettings of(
                 ThrowingConsumer<FsStateChangelogWriter, IOException> action, String name) {
             return new WriterSqnTestSettings(name, action);
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
index b0a49dd..78015e2 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.util.function.BiConsumerWithException;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Random;
@@ -33,16 +33,16 @@ import java.util.concurrent.ExecutionException;
 
 import static org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.getOnlyElement;
 import static org.apache.flink.util.ExceptionUtils.rethrowIOException;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** {@link FsStateChangelogWriter} test. */
-public class FsStateChangelogWriterTest {
+class FsStateChangelogWriterTest {
     private static final int KEY_GROUP = 0;
     private final Random random = new Random();
 
     @Test
-    public void testAppend() throws Exception {
+    void testAppend() throws Exception {
         withWriter(
                 (writer, uploader) -> {
                     writer.append(KEY_GROUP, getBytes());
@@ -53,7 +53,7 @@ public class FsStateChangelogWriterTest {
     }
 
     @Test
-    public void testPreUpload() throws Exception {
+    void testPreUpload() throws Exception {
         int threshold = 1000;
         withWriter(
                 threshold,
@@ -68,7 +68,7 @@ public class FsStateChangelogWriterTest {
     }
 
     @Test
-    public void testPersist() throws Exception {
+    void testPersist() throws Exception {
         withWriter(
                 (writer, uploader) -> {
                     byte[] bytes = getBytes();
@@ -76,17 +76,17 @@ public class FsStateChangelogWriterTest {
                             writer.persist(append(writer, bytes));
                     assertSubmittedOnly(uploader, bytes);
                     uploader.completeUpload();
-                    assertArrayEquals(
-                            bytes,
-                            getOnlyElement(future.get().getHandlesAndOffsets())
-                                    .f0
-                                    .asBytesIfInMemory()
-                                    .get());
+                    assertThat(
+                                    getOnlyElement(future.get().getHandlesAndOffsets())
+                                            .f0
+                                            .asBytesIfInMemory()
+                                            .get())
+                            .isEqualTo(bytes);
                 });
     }
 
     @Test
-    public void testPersistAgain() throws Exception {
+    void testPersistAgain() throws Exception {
         withWriter(
                 (writer, uploader) -> {
                     byte[] bytes = getBytes();
@@ -100,7 +100,7 @@ public class FsStateChangelogWriterTest {
     }
 
     @Test
-    public void testNoReUploadBeforeCompletion() throws Exception {
+    void testNoReUploadBeforeCompletion() throws Exception {
         withWriter(
                 (writer, uploader) -> {
                     byte[] bytes = getBytes();
@@ -113,7 +113,7 @@ public class FsStateChangelogWriterTest {
     }
 
     @Test
-    public void testPersistNewlyAppended() throws Exception {
+    void testPersistNewlyAppended() throws Exception {
         withWriter(
                 (writer, uploader) -> {
                     SequenceNumber sqn = append(writer, getBytes());
@@ -128,7 +128,7 @@ public class FsStateChangelogWriterTest {
 
     /** Emulates checkpoint abortion followed by a new checkpoint. */
     @Test
-    public void testPersistAfterReset() throws Exception {
+    void testPersistAfterReset() throws Exception {
         withWriter(
                 (writer, uploader) -> {
                     byte[] bytes = getBytes();
@@ -140,36 +140,43 @@ public class FsStateChangelogWriterTest {
                 });
     }
 
-    @Test(expected = IOException.class)
-    public void testPersistFailure() throws Exception {
-        withWriter(
-                (writer, uploader) -> {
-                    byte[] bytes = getBytes();
-                    SequenceNumber sqn = append(writer, bytes);
-                    CompletableFuture<ChangelogStateHandleStreamImpl> future = writer.persist(sqn);
-                    uploader.failUpload(new RuntimeException("test"));
-                    try {
-                        future.get();
-                    } catch (ExecutionException e) {
-                        rethrowIOException(e.getCause());
-                    }
-                });
+    @Test
+    void testPersistFailure() {
+        assertThatThrownBy(
+                        () ->
+                                withWriter(
+                                        (writer, uploader) -> {
+                                            byte[] bytes = getBytes();
+                                            SequenceNumber sqn = append(writer, bytes);
+                                            CompletableFuture<ChangelogStateHandleStreamImpl>
+                                                    future = writer.persist(sqn);
+                                            uploader.failUpload(new RuntimeException("test"));
+                                            try {
+                                                future.get();
+                                            } catch (ExecutionException e) {
+                                                rethrowIOException(e.getCause());
+                                            }
+                                        }))
+                .isInstanceOf(IOException.class);
     }
 
-    @Test(expected = IOException.class)
-    public void testPersistFailedChanges() throws Exception {
-        withWriter(
-                (writer, uploader) -> {
-                    byte[] bytes = getBytes();
-                    SequenceNumber sqn = append(writer, bytes);
-                    writer.persist(sqn); // future result ignored
-                    uploader.failUpload(new RuntimeException("test"));
-                    writer.persist(sqn); // should fail right away
-                });
+    @Test
+    void testPersistFailedChanges() {
+        assertThatThrownBy(
+                        () ->
+                                withWriter(
+                                        (writer, uploader) -> {
+                                            byte[] bytes = getBytes();
+                                            SequenceNumber sqn = append(writer, bytes);
+                                            writer.persist(sqn); // future result ignored
+                                            uploader.failUpload(new RuntimeException("test"));
+                                            writer.persist(sqn); // should fail right away
+                                        }))
+                .isInstanceOf(IOException.class);
     }
 
     @Test
-    public void testPersistNonFailedChanges() throws Exception {
+    void testPersistNonFailedChanges() throws Exception {
         withWriter(
                 (writer, uploader) -> {
                     byte[] bytes = getBytes();
@@ -184,14 +191,17 @@ public class FsStateChangelogWriterTest {
                 });
     }
 
-    @Test(expected = IllegalArgumentException.class)
-    public void testTruncate() throws Exception {
-        withWriter(
-                (writer, uploader) -> {
-                    SequenceNumber sqn = append(writer, getBytes());
-                    writer.truncate(sqn.next());
-                    writer.persist(sqn);
-                });
+    @Test
+    void testTruncate() {
+        assertThatThrownBy(
+                        () ->
+                                withWriter(
+                                        (writer, uploader) -> {
+                                            SequenceNumber sqn = append(writer, getBytes());
+                                            writer.truncate(sqn.next());
+                                            writer.persist(sqn);
+                                        }))
+                .isInstanceOf(IllegalArgumentException.class);
     }
 
     private void withWriter(
@@ -219,9 +229,8 @@ public class FsStateChangelogWriterTest {
     }
 
     private void assertSubmittedOnly(TestingStateChangeUploader uploader, byte[] bytes) {
-        assertArrayEquals(
-                bytes,
-                getOnlyElement(getOnlyElement(uploader.getUploaded()).getChanges()).getChange());
+        assertThat(getOnlyElement(getOnlyElement(uploader.getUploaded()).getChanges()).getChange())
+                .isEqualTo(bytes);
     }
 
     private SequenceNumber append(FsStateChangelogWriter writer, byte[] bytes) throws IOException {
@@ -241,6 +250,6 @@ public class FsStateChangelogWriterTest {
     }
 
     private static void assertNoUpload(TestingStateChangeUploader uploader, String message) {
-        assertTrue(message, uploader.getUploaded().isEmpty());
+        assertThat(uploader.getUploaded()).isEmpty();
     }
 }
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java
index 9e838b3..3727c81 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java
@@ -24,7 +24,8 @@ import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
 
-import org.junit.Test;
+import org.assertj.core.data.Percentage;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -44,11 +45,11 @@ import java.util.stream.IntStream;
 import static java.util.Collections.singletonList;
 import static java.util.stream.Collectors.toList;
 import static org.apache.flink.changelog.fs.UnregisteredChangelogStorageMetricGroup.createUnregisteredChangelogStorageMetricGroup;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** {@link RetryingExecutor} test. */
-public class RetryingExecutorTest {
+class RetryingExecutorTest {
 
     private static final ThrowingConsumer<Integer, Exception> FAILING_TASK =
             attempt -> {
@@ -56,17 +57,17 @@ public class RetryingExecutorTest {
             };
 
     @Test
-    public void testNoRetries() throws Exception {
+    void testNoRetries() throws Exception {
         testPolicy(1, RetryPolicy.NONE, FAILING_TASK);
     }
 
     @Test
-    public void testFixedRetryLimit() throws Exception {
+    void testFixedRetryLimit() throws Exception {
         testPolicy(5, RetryPolicy.fixed(5, 0, 0), FAILING_TASK);
     }
 
     @Test
-    public void testDiscardOnTimeout() throws Exception {
+    void testDiscardOnTimeout() throws Exception {
         int timeoutMs = 5;
         int numAttempts = 7;
         int successfulAttempt = numAttempts - 1;
@@ -115,14 +116,13 @@ public class RetryingExecutorTest {
                 Thread.sleep(10);
             }
         }
-        assertEquals(singletonList(successfulAttempt), completed);
-        assertEquals(
-                IntStream.range(0, successfulAttempt).boxed().collect(toList()),
-                discarded.stream().sorted().collect(toList()));
+        assertThat(singletonList(successfulAttempt)).isEqualTo(completed);
+        assertThat(IntStream.range(0, successfulAttempt).boxed().collect(toList()))
+                .isEqualTo(discarded.stream().sorted().collect(toList()));
     }
 
     @Test
-    public void testFixedRetrySuccess() throws Exception {
+    void testFixedRetrySuccess() throws Exception {
         int successfulAttempt = 3;
         int maxAttempts = successfulAttempt * 2;
         testPolicy(
@@ -136,7 +136,7 @@ public class RetryingExecutorTest {
     }
 
     @Test
-    public void testNonRetryableException() throws Exception {
+    void testNonRetryableException() throws Exception {
         testPolicy(
                 1,
                 RetryPolicy.fixed(Integer.MAX_VALUE, 0, 0),
@@ -146,7 +146,7 @@ public class RetryingExecutorTest {
     }
 
     @Test
-    public void testRetryDelay() throws Exception {
+    void testRetryDelay() throws Exception {
         int delayAfterFailure = 123;
         int numAttempts = 2;
         testPolicy(
@@ -161,7 +161,7 @@ public class RetryingExecutorTest {
                     @Override
                     public ScheduledFuture<?> schedule(
                             Runnable command, long delay, TimeUnit unit) {
-                        assertEquals(delayAfterFailure, delay);
+                        assertThat(delay).isEqualTo(delayAfterFailure);
                         command.run();
                         return CompletedScheduledFuture.create(null);
                     }
@@ -169,7 +169,7 @@ public class RetryingExecutorTest {
     }
 
     @Test
-    public void testNoRetryDelayIfTimeout() throws Exception {
+    void testNoRetryDelayIfTimeout() throws Exception {
         int delayAfterFailure = 123;
         int numAttempts = 2;
         testPolicy(
@@ -191,7 +191,7 @@ public class RetryingExecutorTest {
     }
 
     @Test
-    public void testTimeout() throws Exception {
+    void testTimeout() throws Exception {
         int numAttempts = 2;
         int timeout = 500;
         CompletableFuture<Long> firstStart = new CompletableFuture<>();
@@ -209,11 +209,9 @@ public class RetryingExecutorTest {
                     }
                 },
                 Executors.newScheduledThreadPool(2));
-        assertEquals(
-                timeout,
-                ((double) secondStart.get() - firstStart.get()) / 1_000_000,
-                timeout
-                        * 0.75d /* future completion can be delayed arbitrarily causing start delta be less than timeout */);
+        /* future completion can be delayed arbitrarily causing start delta be less than timeout */
+        assertThat(((double) secondStart.get() - firstStart.get()) / 1_000_000)
+                .isCloseTo(timeout, Percentage.withPercentage(75));
     }
 
     private void testPolicy(
@@ -246,7 +244,7 @@ public class RetryingExecutorTest {
                             }));
             firstAttemptCompletedLatch.await(); // before closing executor
         }
-        assertEquals(expectedAttempts, attemptsMade.get());
+        assertThat(attemptsMade.get()).isEqualTo(expectedAttempts);
     }
 
     private static RetriableAction<?> runnableToAction(RunnableWithException action) {
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java
index 8e29dd1..4bedadf 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import static java.util.Collections.emptyMap;
 import static java.util.stream.Collectors.toList;
 
+/** Default implementation class for {@link StateChangeUploader} to test. */
 class TestingStateChangeUploader implements StateChangeUploader {
     private final Collection<StateChangeSet> uploaded = new CopyOnWriteArrayList<>();
     private final List<UploadTask> tasks;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
index d7ddf40..664239e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
@@ -28,10 +28,11 @@ import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import org.apache.flink.util.CloseableIterator;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -45,35 +46,44 @@ import java.util.stream.Stream;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toMap;
 import static java.util.stream.StreamSupport.stream;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** {@link InMemoryStateChangelogStorage} test. */
 public class StateChangelogStorageTest<T extends ChangelogStateHandle> {
 
     private final Random random = new Random();
 
-    @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-    @Test(expected = IllegalStateException.class)
-    public void testNoAppendAfterClose() throws IOException {
-        StateChangelogWriter<?> writer =
-                getFactory()
-                        .createWriter(
-                                new OperatorID().toString(),
-                                KeyGroupRange.of(0, 0),
-                                new SyncMailboxExecutor());
-        writer.close();
-        writer.append(0, new byte[0]);
+    @TempDir public File temporaryFolder;
+
+    public static Stream<Boolean> parameters() {
+        return Stream.of(true);
+    }
+
+    @MethodSource("parameters")
+    @ParameterizedTest(name = "compression = {0}")
+    public void testNoAppendAfterClose(boolean compression) throws IOException {
+        assertThatThrownBy(
+                        () -> {
+                            StateChangelogWriter<?> writer =
+                                    getFactory(compression, temporaryFolder)
+                                            .createWriter(
+                                                    new OperatorID().toString(),
+                                                    KeyGroupRange.of(0, 0),
+                                                    new SyncMailboxExecutor());
+                            writer.close();
+                            writer.append(0, new byte[0]);
+                        })
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test
-    public void testWriteAndRead() throws Exception {
+    @MethodSource("parameters")
+    @ParameterizedTest(name = "compression = {0}")
+    public void testWriteAndRead(boolean compression) throws Exception {
         KeyGroupRange kgRange = KeyGroupRange.of(0, 5);
         Map<Integer, List<byte[]>> appendsByKeyGroup = generateAppends(kgRange, 10, 20);
 
-        try (StateChangelogStorage<T> client = getFactory();
+        try (StateChangelogStorage<T> client = getFactory(compression, temporaryFolder);
                 StateChangelogWriter<T> writer =
                         client.createWriter(
                                 new OperatorID().toString(), kgRange, new SyncMailboxExecutor())) {
@@ -95,16 +105,16 @@ public class StateChangelogStorageTest<T extends ChangelogStateHandle> {
 
     private void assertByteMapsEqual(
             Map<Integer, List<byte[]>> expected, Map<Integer, List<byte[]>> actual) {
-        assertEquals(expected.size(), actual.size());
+        assertThat(actual).hasSameSizeAs(expected);
         for (Map.Entry<Integer, List<byte[]>> e : expected.entrySet()) {
             List<byte[]> expectedList = e.getValue();
             List<byte[]> actualList = actual.get(e.getKey());
             Iterator<byte[]> ite = expectedList.iterator(), ale = actualList.iterator();
             while (ite.hasNext() && ale.hasNext()) {
-                assertArrayEquals(ite.next(), ale.next());
+                assertThat(ale.next()).isEqualTo(ite.next());
             }
-            assertFalse(ite.hasNext());
-            assertFalse(ale.hasNext());
+            assertThat(ite.hasNext()).isFalse();
+            assertThat(ale.hasNext()).isFalse();
         }
     }
 
@@ -139,7 +149,8 @@ public class StateChangelogStorageTest<T extends ChangelogStateHandle> {
         return bytes;
     }
 
-    protected StateChangelogStorage<T> getFactory() throws IOException {
+    protected StateChangelogStorage<T> getFactory(boolean compression, File temporaryFolder)
+            throws IOException {
         return (StateChangelogStorage<T>) new InMemoryStateChangelogStorage();
     }
 }