You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/03/22 07:04:50 UTC
[flink] branch master updated: [FLINK-25549][state/changelog] Migrate flink-dstl to use JUnit5
This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 480956f [FLINK-25549][state/changelog] Migrate flink-dstl to use JUnit5
480956f is described below
commit 480956f3590426ead0578781d5ef36fc3a56b8df
Author: Roc Marshal <fl...@126.com>
AuthorDate: Fri Mar 4 08:24:53 2022 +0800
[FLINK-25549][state/changelog] Migrate flink-dstl to use JUnit5
This closes #18978.
---
.../fs/BatchingStateChangeUploadSchedulerTest.java | 105 +++++++++---------
.../changelog/fs/ChangelogStorageMetricsTest.java | 59 ++++++-----
.../changelog/fs/FsStateChangelogStorageTest.java | 22 ++--
.../fs/FsStateChangelogWriterSqnTest.java | 61 +++++------
.../changelog/fs/FsStateChangelogWriterTest.java | 117 +++++++++++----------
.../flink/changelog/fs/RetryingExecutorTest.java | 44 ++++----
.../changelog/fs/TestingStateChangeUploader.java | 1 +
.../inmemory/StateChangelogStorageTest.java | 63 ++++++-----
8 files changed, 243 insertions(+), 229 deletions(-)
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
index 2b13d91..621d46b 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.function.BiConsumerWithException;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.time.Duration;
@@ -56,19 +56,18 @@ import static java.util.stream.Collectors.toMap;
import static org.apache.flink.changelog.fs.UnregisteredChangelogStorageMetricGroup.createUnregisteredChangelogStorageMetricGroup;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.ExceptionUtils.rethrow;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
/** {@link BatchingStateChangeUploadScheduler} test. */
-public class BatchingStateChangeUploadSchedulerTest {
+class BatchingStateChangeUploadSchedulerTest {
private static final int MAX_BYTES_IN_FLIGHT = 10_000;
private final Random random = new Random();
@Test
- public void testNoDelayAndThreshold() throws Exception {
+ void testNoDelayAndThreshold() throws Exception {
withStore(
0,
0,
@@ -89,7 +88,7 @@ public class BatchingStateChangeUploadSchedulerTest {
}
@Test
- public void testSizeThreshold() throws Exception {
+ void testSizeThreshold() throws Exception {
int numChanges = 7;
int changeSize = 11;
int threshold = changeSize * numChanges;
@@ -108,14 +107,14 @@ public class BatchingStateChangeUploadSchedulerTest {
if (runningSize >= threshold) {
assertSaved(probe, expected);
} else {
- assertTrue(probe.getUploaded().isEmpty());
+ assertThat(probe.getUploaded()).isEmpty();
}
}
});
}
@Test
- public void testDelay() throws Exception {
+ void testDelay() throws Exception {
int delayMs = 50;
ManuallyTriggeredScheduledExecutorService scheduler =
new ManuallyTriggeredScheduledExecutorService();
@@ -128,20 +127,17 @@ public class BatchingStateChangeUploadSchedulerTest {
scheduler.triggerAll();
List<StateChangeSet> changeSets = getChanges(4);
upload(store, changeSets);
- assertTrue(probe.getUploaded().isEmpty());
- assertTrue(
- scheduler.getAllNonPeriodicScheduledTask().stream()
- .anyMatch(
- scheduled ->
- scheduled.getDelay(MILLISECONDS) == delayMs));
+ assertThat(probe.getUploaded()).isEmpty();
+ assertThat(scheduler.getAllNonPeriodicScheduledTask())
+ .anyMatch(scheduled -> scheduled.getDelay(MILLISECONDS) == delayMs);
scheduler.triggerAllNonPeriodicTasks();
- assertEquals(changeSets, probe.getUploaded());
+ assertThat(probe.getUploaded()).isEqualTo(changeSets);
});
}
/** Test integration with {@link RetryingExecutor}. */
@Test
- public void testRetry() throws Exception {
+ void testRetry() throws Exception {
final int maxAttempts = 5;
try (BatchingStateChangeUploadScheduler store =
@@ -184,7 +180,7 @@ public class BatchingStateChangeUploadSchedulerTest {
}
@Test
- public void testUploadTimeout() throws Exception {
+ void testUploadTimeout() throws Exception {
AtomicReference<List<SequenceNumber>> failed = new AtomicReference<>();
UploadTask upload =
new UploadTask(getChanges(4), unused -> {}, (sqn, error) -> failed.set(sqn));
@@ -201,12 +197,12 @@ public class BatchingStateChangeUploadSchedulerTest {
}
}
- assertTrue(upload.finished.get());
- assertEquals(
- upload.changeSets.stream()
- .map(StateChangeSet::getSequenceNumber)
- .collect(Collectors.toSet()),
- new HashSet<>(failed.get()));
+ assertThat(upload.finished.get()).isTrue();
+ assertThat(
+ upload.changeSets.stream()
+ .map(StateChangeSet::getSequenceNumber)
+ .collect(Collectors.toSet()))
+ .isEqualTo(new HashSet<>(failed.get()));
}
@Test
@@ -236,19 +232,20 @@ public class BatchingStateChangeUploadSchedulerTest {
}
}
- assertTrue(upload.finished.get());
- assertEquals(
- upload.changeSets.stream()
- .map(StateChangeSet::getSequenceNumber)
- .collect(Collectors.toSet()),
- succeeded.get().stream()
- .map(UploadResult::getSequenceNumber)
- .collect(Collectors.toSet()));
- assertTrue(failed.get().isEmpty());
+ assertThat(upload.finished.get()).isTrue();
+ assertThat(
+ upload.changeSets.stream()
+ .map(StateChangeSet::getSequenceNumber)
+ .collect(Collectors.toSet()))
+ .isEqualTo(
+ succeeded.get().stream()
+ .map(UploadResult::getSequenceNumber)
+ .collect(Collectors.toSet()));
+ assertThat(failed.get()).isEmpty();
}
- @Test(expected = RejectedExecutionException.class)
- public void testErrorHandling() throws Exception {
+ @Test
+ void testErrorHandling() throws Exception {
TestingStateChangeUploader probe = new TestingStateChangeUploader();
DirectScheduledExecutorService scheduler = new DirectScheduledExecutorService();
try (BatchingStateChangeUploadScheduler store =
@@ -265,12 +262,13 @@ public class BatchingStateChangeUploadSchedulerTest {
.getAttemptsPerUpload()),
createUnregisteredChangelogStorageMetricGroup())) {
scheduler.shutdown();
- upload(store, getChanges(4));
+ assertThatThrownBy(() -> upload(store, getChanges(4)))
+ .isInstanceOf(RejectedExecutionException.class);
}
}
@Test
- public void testClose() throws Exception {
+ void testClose() throws Exception {
TestingStateChangeUploader probe = new TestingStateChangeUploader();
DirectScheduledExecutorService scheduler = new DirectScheduledExecutorService();
DirectScheduledExecutorService retryScheduler = new DirectScheduledExecutorService();
@@ -287,13 +285,13 @@ public class BatchingStateChangeUploadSchedulerTest {
.getAttemptsPerUpload()),
createUnregisteredChangelogStorageMetricGroup())
.close();
- assertTrue(probe.isClosed());
- assertTrue(scheduler.isShutdown());
- assertTrue(retryScheduler.isShutdown());
+ assertThat(probe.isClosed()).isTrue();
+ assertThat(scheduler.isShutdown()).isTrue();
+ assertThat(retryScheduler.isShutdown()).isTrue();
}
@Test
- public void testBackPressure() throws Exception {
+ void testBackPressure() throws Exception {
int sizeLimit = MAX_BYTES_IN_FLIGHT;
CompletableFuture<TestingStateChangeUploader> thresholdExceededFuture =
new CompletableFuture<>();
@@ -306,13 +304,14 @@ public class BatchingStateChangeUploadSchedulerTest {
getChanges(sizeLimit / 2).stream(),
getChanges(sizeLimit / 2).stream())
.collect(Collectors.toList());
- assertTrue(uploader.getAvailabilityProvider().isAvailable());
- assertTrue(uploader.getAvailabilityProvider().isApproximatelyAvailable());
+ assertThat(uploader.getAvailabilityProvider().isAvailable()).isTrue();
+ assertThat(uploader.getAvailabilityProvider().isApproximatelyAvailable())
+ .isTrue();
upload(uploader, changes1);
assertSaved(probe, changes1); // sent to upload, not finished yet
thresholdExceededFuture.complete(probe);
List<StateChangeSet> changes2 = getChanges(1);
- assertFalse(uploader.getAvailabilityProvider().isAvailable());
+ assertThat(uploader.getAvailabilityProvider().isAvailable()).isFalse();
upload(uploader, changes2); // should block until capacity released
assertSaved(probe, changes1, changes2);
};
@@ -322,14 +321,14 @@ public class BatchingStateChangeUploadSchedulerTest {
TestingStateChangeUploader probe = thresholdExceededFuture.get();
int uploadedInTheBeginning = probe.getUploaded().size();
Thread.sleep(500); // allow failing, i.e. to proceed with upload
- assertEquals(uploadedInTheBeginning, probe.getUploaded().size());
+ assertThat(probe.getUploaded().size()).isEqualTo(uploadedInTheBeginning);
probe.completeUpload(); // release capacity
uploadFuture.join();
- assertTrue(uploadedInTheBeginning < probe.getUploaded().size());
+ assertThat(probe.getUploaded().size()).isGreaterThan(uploadedInTheBeginning);
}
@Test
- public void testInterruptedWhenBackPressured() throws Exception {
+ void testInterruptedWhenBackPressured() throws Exception {
int limit = MAX_BYTES_IN_FLIGHT;
TestScenario test =
(uploader, probe) -> {
@@ -342,7 +341,7 @@ public class BatchingStateChangeUploadSchedulerTest {
fail("upload shouldn't succeed after exceeding the limit");
} catch (IOException e) {
if (findThrowable(e, InterruptedException.class).isPresent()) {
- assertTrue(probe.getUploaded().isEmpty());
+ assertThat(probe.getUploaded()).isEmpty();
} else {
rethrow(e);
}
@@ -404,9 +403,11 @@ public class BatchingStateChangeUploadSchedulerTest {
@SafeVarargs
private final void assertSaved(
TestingStateChangeUploader probe, List<StateChangeSet>... expected) {
- assertEquals(
- Arrays.stream(expected).flatMap(Collection::stream).collect(Collectors.toList()),
- new ArrayList<>(probe.getUploaded()));
+ assertThat(new ArrayList<>(probe.getUploaded()))
+ .isEqualTo(
+ Arrays.stream(expected)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()));
}
private interface TestScenario
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
index 3444139..24280c8 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
@@ -29,15 +29,16 @@ import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -47,42 +48,41 @@ import static org.apache.flink.changelog.fs.ChangelogStorageMetricGroup.CHANGELO
import static org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup;
import static org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup;
import static org.apache.flink.runtime.state.KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** {@link ChangelogStorageMetricGroup} test. */
public class ChangelogStorageMetricsTest {
- @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @TempDir java.nio.file.Path tempFolder;
@Test
- public void testUploadsCounter() throws Exception {
+ void testUploadsCounter() throws Exception {
ChangelogStorageMetricGroup metrics =
new ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup());
try (FsStateChangelogStorage storage =
new FsStateChangelogStorage(
- Path.fromLocalFile(temporaryFolder.newFolder()), false, 100, metrics)) {
+ Path.fromLocalFile(tempFolder.toFile()), false, 100, metrics)) {
FsStateChangelogWriter writer = createWriter(storage);
-
int numUploads = 5;
for (int i = 0; i < numUploads; i++) {
SequenceNumber from = writer.nextSequenceNumber();
writer.append(0, new byte[] {0, 1, 2, 3});
writer.persist(from).get();
}
- assertEquals(numUploads, metrics.getUploadsCounter().getCount());
- assertTrue(metrics.getUploadLatenciesNanos().getStatistics().getMin() > 0);
+ assertThat(metrics.getUploadsCounter().getCount()).isEqualTo(numUploads);
+ assertThat(metrics.getUploadLatenciesNanos().getStatistics().getMin()).isGreaterThan(0);
}
}
@Test
- public void testUploadSizes() throws Exception {
+ void testUploadSizes() throws Exception {
ChangelogStorageMetricGroup metrics =
new ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup());
try (FsStateChangelogStorage storage =
new FsStateChangelogStorage(
- Path.fromLocalFile(temporaryFolder.newFolder()), false, 100, metrics)) {
+ Path.fromLocalFile(tempFolder.toFile()), false, 100, metrics)) {
FsStateChangelogWriter writer = createWriter(storage);
// upload single byte to infer header size
@@ -98,13 +98,14 @@ public class ChangelogStorageMetricsTest {
writer.persist(from).get();
}
long expected = upload.length + headerSize;
- assertEquals(expected, metrics.getUploadSizes().getStatistics().getMax());
+ assertThat(metrics.getUploadSizes().getStatistics().getMax()).isEqualTo(expected);
}
}
@Test
- public void testUploadFailuresCounter() throws Exception {
- File file = temporaryFolder.newFile(); // using file instead of folder will cause a failure
+ void testUploadFailuresCounter() throws Exception {
+ // using file instead of folder will cause a failure
+ File file = Files.createTempFile(tempFolder, UUID.randomUUID().toString(), "").toFile();
ChangelogStorageMetricGroup metrics =
new ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup());
try (FsStateChangelogStorage storage =
@@ -121,17 +122,17 @@ public class ChangelogStorageMetricsTest {
// ignore
}
}
- assertEquals(numUploads, metrics.getUploadFailuresCounter().getCount());
+ assertThat(metrics.getUploadFailuresCounter().getCount()).isEqualTo(numUploads);
}
}
@Test
- public void testUploadBatchSizes() throws Exception {
+ void testUploadBatchSizes() throws Exception {
int numWriters = 5, numUploads = 5;
ChangelogStorageMetricGroup metrics =
new ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup());
- Path basePath = Path.fromLocalFile(temporaryFolder.newFolder());
+ Path basePath = Path.fromLocalFile(tempFolder.toFile());
StateChangeFsUploader uploader =
new StateChangeFsUploader(basePath, basePath.getFileSystem(), false, 100, metrics);
ManuallyTriggeredScheduledExecutorService scheduler =
@@ -167,15 +168,17 @@ public class ChangelogStorageMetricsTest {
// now the uploads should be grouped and executed at once
scheduler.triggerScheduledTasks();
}
- assertEquals(numWriters, metrics.getUploadBatchSizes().getStatistics().getMin());
- assertEquals(numWriters, metrics.getUploadBatchSizes().getStatistics().getMax());
+ assertThat(metrics.getUploadBatchSizes().getStatistics().getMin())
+ .isEqualTo(numWriters);
+ assertThat(metrics.getUploadBatchSizes().getStatistics().getMax())
+ .isEqualTo(numWriters);
} finally {
storage.close();
}
}
@Test
- public void testAttemptsPerUpload() throws Exception {
+ void testAttemptsPerUpload() throws Exception {
int numUploads = 7, maxAttempts = 3;
ChangelogStorageMetricGroup metrics =
@@ -202,15 +205,15 @@ public class ChangelogStorageMetricsTest {
writer.persist(from).get();
}
HistogramStatistics histogram = metrics.getAttemptsPerUpload().getStatistics();
- assertEquals(maxAttempts, histogram.getMin());
- assertEquals(maxAttempts, histogram.getMax());
+ assertThat(histogram.getMin()).isEqualTo(maxAttempts);
+ assertThat(histogram.getMax()).isEqualTo(maxAttempts);
} finally {
storage.close();
}
}
@Test
- public void testQueueSize() throws Exception {
+ void testQueueSize() throws Exception {
AtomicReference<Gauge<Integer>> queueSizeGauge = new AtomicReference<>();
ChangelogStorageMetricGroup metrics =
new ChangelogStorageMetricGroup(
@@ -228,7 +231,7 @@ public class ChangelogStorageMetricsTest {
new JobID(),
"test"));
- Path path = Path.fromLocalFile(temporaryFolder.newFolder());
+ Path path = Path.fromLocalFile(tempFolder.toFile());
StateChangeFsUploader delegate =
new StateChangeFsUploader(path, path.getFileSystem(), false, 100, metrics);
ManuallyTriggeredScheduledExecutorService scheduler =
@@ -252,9 +255,9 @@ public class ChangelogStorageMetricsTest {
writer.append(0, new byte[] {0});
writer.persist(from);
}
- assertEquals(numUploads, (int) queueSizeGauge.get().getValue());
+ assertThat((int) queueSizeGauge.get().getValue()).isEqualTo(numUploads);
scheduler.triggerScheduledTasks();
- assertEquals(0, (int) queueSizeGauge.get().getValue());
+ assertThat((int) queueSizeGauge.get().getValue()).isEqualTo(0);
}
}
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
index 602155d..e23945c 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.changelog.fs.BatchingStateChangeUploadSchedulerTest.Bloc
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.changelog.inmemory.StateChangelogStorageTest;
@@ -28,30 +29,29 @@ import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.Test;
+import java.io.File;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.stream.Stream;
import static org.apache.flink.changelog.fs.UnregisteredChangelogStorageMetricGroup.createUnregisteredChangelogStorageMetricGroup;
/** {@link FsStateChangelogStorage} test. */
-@RunWith(Parameterized.class)
-public class FsStateChangelogStorageTest extends StateChangelogStorageTest {
- @Parameterized.Parameter public boolean compression;
+public class FsStateChangelogStorageTest
+ extends StateChangelogStorageTest<ChangelogStateHandleStreamImpl> {
- @Parameterized.Parameters(name = "use compression = {0}")
- public static Object[] parameters() {
- return new Object[] {true, false};
+ public static Stream<Boolean> parameters() {
+ return Stream.of(true, false);
}
@Override
- protected StateChangelogStorage<?> getFactory() throws IOException {
+ protected StateChangelogStorage<ChangelogStateHandleStreamImpl> getFactory(
+ boolean compression, File temporaryFolder) throws IOException {
return new FsStateChangelogStorage(
- Path.fromLocalFile(temporaryFolder.newFolder()),
+ Path.fromLocalFile(temporaryFolder),
compression,
1024 * 1024 * 10,
createUnregisteredChangelogStorageMetricGroup());
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
index f3e576d..b005572 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
@@ -23,27 +23,23 @@ import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.util.function.ThrowingConsumer;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
-import java.util.List;
import java.util.UUID;
+import java.util.stream.Stream;
-import static java.util.Arrays.asList;
import static org.apache.flink.changelog.fs.FsStateChangelogWriterSqnTest.WriterSqnTestSettings.of;
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
/**
* Test of incrementing {@link SequenceNumber sequence numbers} by {@link FsStateChangelogWriter}.
*/
-@RunWith(Parameterized.class)
public class FsStateChangelogWriterSqnTest {
- @Parameterized.Parameters(name = "{0}")
- public static List<WriterSqnTestSettings> getSettings() {
- return asList(
+ private static Stream<WriterSqnTestSettings> getSettings() {
+ return Stream.of(
of(StateChangelogWriter::nextSequenceNumber, "nextSequenceNumber")
.withAppendCall(false)
.expectIncrement(false),
@@ -73,14 +69,9 @@ public class FsStateChangelogWriterSqnTest {
.expectIncrement(true));
}
- private final WriterSqnTestSettings test;
-
- public FsStateChangelogWriterSqnTest(WriterSqnTestSettings test) {
- this.test = test;
- }
-
- @Test
- public void runTest() throws IOException {
+ @MethodSource("getSettings")
+ @ParameterizedTest(name = "writerSqnTestSettings = {0}")
+ void runTest(WriterSqnTestSettings writerSqnTestSettings) throws IOException {
try (FsStateChangelogWriter writer =
new FsStateChangelogWriter(
UUID.randomUUID(),
@@ -89,28 +80,19 @@ public class FsStateChangelogWriterSqnTest {
new TestingStateChangeUploader()),
Long.MAX_VALUE,
new SyncMailboxExecutor())) {
- if (test.withAppend) {
+ if (writerSqnTestSettings.withAppend) {
append(writer);
}
- test.action.accept(writer);
- assertEquals(
- getMessage(),
- test.expectIncrement
- ? writer.initialSequenceNumber().next()
- : writer.initialSequenceNumber(),
- writer.lastAppendedSqnUnsafe());
+ writerSqnTestSettings.action.accept(writer);
+ assertThat(writer.lastAppendedSqnUnsafe())
+ .as(writerSqnTestSettings.getMessage())
+ .isEqualTo(
+ writerSqnTestSettings.expectIncrement
+ ? writer.initialSequenceNumber().next()
+ : writer.initialSequenceNumber());
}
}
- private String getMessage() {
- return test.name
- + " should"
- + (test.expectIncrement ? " " : " NOT ")
- + "increment SQN"
- + (test.expectIncrement ? " after " : " without ")
- + "appends";
- }
-
static class WriterSqnTestSettings {
private final String name;
private final ThrowingConsumer<FsStateChangelogWriter, IOException> action;
@@ -123,6 +105,15 @@ public class FsStateChangelogWriterSqnTest {
this.action = action;
}
+ public String getMessage() {
+ return this.name
+ + " should"
+ + (this.expectIncrement ? " " : " NOT ")
+ + "increment SQN"
+ + (this.expectIncrement ? " after " : " without ")
+ + "appends";
+ }
+
public static WriterSqnTestSettings of(
ThrowingConsumer<FsStateChangelogWriter, IOException> action, String name) {
return new WriterSqnTestSettings(name, action);
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
index b0a49dd..78015e2 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.util.function.BiConsumerWithException;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Random;
@@ -33,16 +33,16 @@ import java.util.concurrent.ExecutionException;
import static org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.getOnlyElement;
import static org.apache.flink.util.ExceptionUtils.rethrowIOException;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** {@link FsStateChangelogWriter} test. */
-public class FsStateChangelogWriterTest {
+class FsStateChangelogWriterTest {
private static final int KEY_GROUP = 0;
private final Random random = new Random();
@Test
- public void testAppend() throws Exception {
+ void testAppend() throws Exception {
withWriter(
(writer, uploader) -> {
writer.append(KEY_GROUP, getBytes());
@@ -53,7 +53,7 @@ public class FsStateChangelogWriterTest {
}
@Test
- public void testPreUpload() throws Exception {
+ void testPreUpload() throws Exception {
int threshold = 1000;
withWriter(
threshold,
@@ -68,7 +68,7 @@ public class FsStateChangelogWriterTest {
}
@Test
- public void testPersist() throws Exception {
+ void testPersist() throws Exception {
withWriter(
(writer, uploader) -> {
byte[] bytes = getBytes();
@@ -76,17 +76,17 @@ public class FsStateChangelogWriterTest {
writer.persist(append(writer, bytes));
assertSubmittedOnly(uploader, bytes);
uploader.completeUpload();
- assertArrayEquals(
- bytes,
- getOnlyElement(future.get().getHandlesAndOffsets())
- .f0
- .asBytesIfInMemory()
- .get());
+ assertThat(
+ getOnlyElement(future.get().getHandlesAndOffsets())
+ .f0
+ .asBytesIfInMemory()
+ .get())
+ .isEqualTo(bytes);
});
}
@Test
- public void testPersistAgain() throws Exception {
+ void testPersistAgain() throws Exception {
withWriter(
(writer, uploader) -> {
byte[] bytes = getBytes();
@@ -100,7 +100,7 @@ public class FsStateChangelogWriterTest {
}
@Test
- public void testNoReUploadBeforeCompletion() throws Exception {
+ void testNoReUploadBeforeCompletion() throws Exception {
withWriter(
(writer, uploader) -> {
byte[] bytes = getBytes();
@@ -113,7 +113,7 @@ public class FsStateChangelogWriterTest {
}
@Test
- public void testPersistNewlyAppended() throws Exception {
+ void testPersistNewlyAppended() throws Exception {
withWriter(
(writer, uploader) -> {
SequenceNumber sqn = append(writer, getBytes());
@@ -128,7 +128,7 @@ public class FsStateChangelogWriterTest {
/** Emulates checkpoint abortion followed by a new checkpoint. */
@Test
- public void testPersistAfterReset() throws Exception {
+ void testPersistAfterReset() throws Exception {
withWriter(
(writer, uploader) -> {
byte[] bytes = getBytes();
@@ -140,36 +140,43 @@ public class FsStateChangelogWriterTest {
});
}
- @Test(expected = IOException.class)
- public void testPersistFailure() throws Exception {
- withWriter(
- (writer, uploader) -> {
- byte[] bytes = getBytes();
- SequenceNumber sqn = append(writer, bytes);
- CompletableFuture<ChangelogStateHandleStreamImpl> future = writer.persist(sqn);
- uploader.failUpload(new RuntimeException("test"));
- try {
- future.get();
- } catch (ExecutionException e) {
- rethrowIOException(e.getCause());
- }
- });
+ @Test
+ void testPersistFailure() {
+ assertThatThrownBy(
+ () ->
+ withWriter(
+ (writer, uploader) -> {
+ byte[] bytes = getBytes();
+ SequenceNumber sqn = append(writer, bytes);
+ CompletableFuture<ChangelogStateHandleStreamImpl>
+ future = writer.persist(sqn);
+ uploader.failUpload(new RuntimeException("test"));
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ rethrowIOException(e.getCause());
+ }
+ }))
+ .isInstanceOf(IOException.class);
}
- @Test(expected = IOException.class)
- public void testPersistFailedChanges() throws Exception {
- withWriter(
- (writer, uploader) -> {
- byte[] bytes = getBytes();
- SequenceNumber sqn = append(writer, bytes);
- writer.persist(sqn); // future result ignored
- uploader.failUpload(new RuntimeException("test"));
- writer.persist(sqn); // should fail right away
- });
+ @Test
+ void testPersistFailedChanges() {
+ assertThatThrownBy(
+ () ->
+ withWriter(
+ (writer, uploader) -> {
+ byte[] bytes = getBytes();
+ SequenceNumber sqn = append(writer, bytes);
+ writer.persist(sqn); // future result ignored
+ uploader.failUpload(new RuntimeException("test"));
+ writer.persist(sqn); // should fail right away
+ }))
+ .isInstanceOf(IOException.class);
}
@Test
- public void testPersistNonFailedChanges() throws Exception {
+ void testPersistNonFailedChanges() throws Exception {
withWriter(
(writer, uploader) -> {
byte[] bytes = getBytes();
@@ -184,14 +191,17 @@ public class FsStateChangelogWriterTest {
});
}
- @Test(expected = IllegalArgumentException.class)
- public void testTruncate() throws Exception {
- withWriter(
- (writer, uploader) -> {
- SequenceNumber sqn = append(writer, getBytes());
- writer.truncate(sqn.next());
- writer.persist(sqn);
- });
+ @Test
+ void testTruncate() {
+ assertThatThrownBy(
+ () ->
+ withWriter(
+ (writer, uploader) -> {
+ SequenceNumber sqn = append(writer, getBytes());
+ writer.truncate(sqn.next());
+ writer.persist(sqn);
+ }))
+ .isInstanceOf(IllegalArgumentException.class);
}
private void withWriter(
@@ -219,9 +229,8 @@ public class FsStateChangelogWriterTest {
}
private void assertSubmittedOnly(TestingStateChangeUploader uploader, byte[] bytes) {
- assertArrayEquals(
- bytes,
- getOnlyElement(getOnlyElement(uploader.getUploaded()).getChanges()).getChange());
+ assertThat(getOnlyElement(getOnlyElement(uploader.getUploaded()).getChanges()).getChange())
+ .isEqualTo(bytes);
}
private SequenceNumber append(FsStateChangelogWriter writer, byte[] bytes) throws IOException {
@@ -241,6 +250,6 @@ public class FsStateChangelogWriterTest {
}
private static void assertNoUpload(TestingStateChangeUploader uploader, String message) {
- assertTrue(message, uploader.getUploaded().isEmpty());
+ assertThat(uploader.getUploaded()).isEmpty();
}
}
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java
index 9e838b3..3727c81 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java
@@ -24,7 +24,8 @@ import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingConsumer;
-import org.junit.Test;
+import org.assertj.core.data.Percentage;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.time.Duration;
@@ -44,11 +45,11 @@ import java.util.stream.IntStream;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static org.apache.flink.changelog.fs.UnregisteredChangelogStorageMetricGroup.createUnregisteredChangelogStorageMetricGroup;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
/** {@link RetryingExecutor} test. */
-public class RetryingExecutorTest {
+class RetryingExecutorTest {
private static final ThrowingConsumer<Integer, Exception> FAILING_TASK =
attempt -> {
@@ -56,17 +57,17 @@ public class RetryingExecutorTest {
};
@Test
- public void testNoRetries() throws Exception {
+ void testNoRetries() throws Exception {
testPolicy(1, RetryPolicy.NONE, FAILING_TASK);
}
@Test
- public void testFixedRetryLimit() throws Exception {
+ void testFixedRetryLimit() throws Exception {
testPolicy(5, RetryPolicy.fixed(5, 0, 0), FAILING_TASK);
}
@Test
- public void testDiscardOnTimeout() throws Exception {
+ void testDiscardOnTimeout() throws Exception {
int timeoutMs = 5;
int numAttempts = 7;
int successfulAttempt = numAttempts - 1;
@@ -115,14 +116,13 @@ public class RetryingExecutorTest {
Thread.sleep(10);
}
}
- assertEquals(singletonList(successfulAttempt), completed);
- assertEquals(
- IntStream.range(0, successfulAttempt).boxed().collect(toList()),
- discarded.stream().sorted().collect(toList()));
+ assertThat(singletonList(successfulAttempt)).isEqualTo(completed);
+ assertThat(IntStream.range(0, successfulAttempt).boxed().collect(toList()))
+ .isEqualTo(discarded.stream().sorted().collect(toList()));
}
@Test
- public void testFixedRetrySuccess() throws Exception {
+ void testFixedRetrySuccess() throws Exception {
int successfulAttempt = 3;
int maxAttempts = successfulAttempt * 2;
testPolicy(
@@ -136,7 +136,7 @@ public class RetryingExecutorTest {
}
@Test
- public void testNonRetryableException() throws Exception {
+ void testNonRetryableException() throws Exception {
testPolicy(
1,
RetryPolicy.fixed(Integer.MAX_VALUE, 0, 0),
@@ -146,7 +146,7 @@ public class RetryingExecutorTest {
}
@Test
- public void testRetryDelay() throws Exception {
+ void testRetryDelay() throws Exception {
int delayAfterFailure = 123;
int numAttempts = 2;
testPolicy(
@@ -161,7 +161,7 @@ public class RetryingExecutorTest {
@Override
public ScheduledFuture<?> schedule(
Runnable command, long delay, TimeUnit unit) {
- assertEquals(delayAfterFailure, delay);
+ assertThat(delay).isEqualTo(delayAfterFailure);
command.run();
return CompletedScheduledFuture.create(null);
}
@@ -169,7 +169,7 @@ public class RetryingExecutorTest {
}
@Test
- public void testNoRetryDelayIfTimeout() throws Exception {
+ void testNoRetryDelayIfTimeout() throws Exception {
int delayAfterFailure = 123;
int numAttempts = 2;
testPolicy(
@@ -191,7 +191,7 @@ public class RetryingExecutorTest {
}
@Test
- public void testTimeout() throws Exception {
+ void testTimeout() throws Exception {
int numAttempts = 2;
int timeout = 500;
CompletableFuture<Long> firstStart = new CompletableFuture<>();
@@ -209,11 +209,9 @@ public class RetryingExecutorTest {
}
},
Executors.newScheduledThreadPool(2));
- assertEquals(
- timeout,
- ((double) secondStart.get() - firstStart.get()) / 1_000_000,
- timeout
- * 0.75d /* future completion can be delayed arbitrarily causing start delta be less than timeout */);
+ /* future completion can be delayed arbitrarily causing start delta be less than timeout */
+ assertThat(((double) secondStart.get() - firstStart.get()) / 1_000_000)
+ .isCloseTo(timeout, Percentage.withPercentage(75));
}
private void testPolicy(
@@ -246,7 +244,7 @@ public class RetryingExecutorTest {
}));
firstAttemptCompletedLatch.await(); // before closing executor
}
- assertEquals(expectedAttempts, attemptsMade.get());
+ assertThat(attemptsMade.get()).isEqualTo(expectedAttempts);
}
private static RetriableAction<?> runnableToAction(RunnableWithException action) {
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java
index 8e29dd1..4bedadf 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
+/** Default implementation class for {@link StateChangeUploader} to test. */
class TestingStateChangeUploader implements StateChangeUploader {
private final Collection<StateChangeSet> uploaded = new CopyOnWriteArrayList<>();
private final List<UploadTask> tasks;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
index d7ddf40..664239e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
@@ -28,10 +28,11 @@ import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.util.CloseableIterator;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -45,35 +46,44 @@ import java.util.stream.Stream;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.StreamSupport.stream;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** {@link InMemoryStateChangelogStorage} test. */
public class StateChangelogStorageTest<T extends ChangelogStateHandle> {
private final Random random = new Random();
- @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- @Test(expected = IllegalStateException.class)
- public void testNoAppendAfterClose() throws IOException {
- StateChangelogWriter<?> writer =
- getFactory()
- .createWriter(
- new OperatorID().toString(),
- KeyGroupRange.of(0, 0),
- new SyncMailboxExecutor());
- writer.close();
- writer.append(0, new byte[0]);
+ @TempDir public File temporaryFolder;
+
+ public static Stream<Boolean> parameters() {
+ return Stream.of(true);
+ }
+
+ @MethodSource("parameters")
+ @ParameterizedTest(name = "compression = {0}")
+ public void testNoAppendAfterClose(boolean compression) throws IOException {
+ assertThatThrownBy(
+ () -> {
+ StateChangelogWriter<?> writer =
+ getFactory(compression, temporaryFolder)
+ .createWriter(
+ new OperatorID().toString(),
+ KeyGroupRange.of(0, 0),
+ new SyncMailboxExecutor());
+ writer.close();
+ writer.append(0, new byte[0]);
+ })
+ .isInstanceOf(IllegalStateException.class);
}
- @Test
- public void testWriteAndRead() throws Exception {
+ @MethodSource("parameters")
+ @ParameterizedTest(name = "compression = {0}")
+ public void testWriteAndRead(boolean compression) throws Exception {
KeyGroupRange kgRange = KeyGroupRange.of(0, 5);
Map<Integer, List<byte[]>> appendsByKeyGroup = generateAppends(kgRange, 10, 20);
- try (StateChangelogStorage<T> client = getFactory();
+ try (StateChangelogStorage<T> client = getFactory(compression, temporaryFolder);
StateChangelogWriter<T> writer =
client.createWriter(
new OperatorID().toString(), kgRange, new SyncMailboxExecutor())) {
@@ -95,16 +105,16 @@ public class StateChangelogStorageTest<T extends ChangelogStateHandle> {
private void assertByteMapsEqual(
Map<Integer, List<byte[]>> expected, Map<Integer, List<byte[]>> actual) {
- assertEquals(expected.size(), actual.size());
+ assertThat(actual).hasSameSizeAs(expected);
for (Map.Entry<Integer, List<byte[]>> e : expected.entrySet()) {
List<byte[]> expectedList = e.getValue();
List<byte[]> actualList = actual.get(e.getKey());
Iterator<byte[]> ite = expectedList.iterator(), ale = actualList.iterator();
while (ite.hasNext() && ale.hasNext()) {
- assertArrayEquals(ite.next(), ale.next());
+ assertThat(ale.next()).isEqualTo(ite.next());
}
- assertFalse(ite.hasNext());
- assertFalse(ale.hasNext());
+ assertThat(ite.hasNext()).isFalse();
+ assertThat(ale.hasNext()).isFalse();
}
}
@@ -139,7 +149,8 @@ public class StateChangelogStorageTest<T extends ChangelogStateHandle> {
return bytes;
}
- protected StateChangelogStorage<T> getFactory() throws IOException {
+ protected StateChangelogStorage<T> getFactory(boolean compression, File temporaryFolder)
+ throws IOException {
return (StateChangelogStorage<T>) new InMemoryStateChangelogStorage();
}
}