You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/08/09 15:46:52 UTC

[flink] 01/02: [hotfix] Migrate PartitionFileWriteReadTest, SortMergeResultPartitionReadSchedulerTest, SortMergeSubpartitionReaderTest to Junit5 and AssertJ

This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e92b87df73897843a136bd273a38e90be8ab2d68
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Aug 3 01:38:35 2022 +0800

    [hotfix] Migrate PartitionFileWriteReadTest, SortMergeResultPartitionReadSchedulerTest, SortMergeSubpartitionReaderTest to Junit5 and AssertJ
    
    This closes #20333.
---
 .../partition/PartitionedFileWriteReadTest.java    | 54 +++++++++-------
 .../SortMergeResultPartitionReadSchedulerTest.java | 75 ++++++++++------------
 .../partition/SortMergeSubpartitionReaderTest.java | 72 ++++++++++-----------
 3 files changed, 98 insertions(+), 103 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
index adfe71792d5..741fd3f1882 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
@@ -26,9 +26,8 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.util.IOUtils;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.nio.channels.FileChannel;
@@ -44,17 +43,17 @@ import java.util.Random;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
  * Tests for writing and reading {@link PartitionedFile} with {@link PartitionedFileWriter} and
  * {@link PartitionedFileReader}.
  */
-public class PartitionedFileWriteReadTest {
-
-    @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+class PartitionedFileWriteReadTest {
+    private @TempDir Path tempPath;
 
     @Test
-    public void testWriteAndReadPartitionedFile() throws Exception {
+    void testWriteAndReadPartitionedFile() throws Exception {
         int numSubpartitions = 10;
         int bufferSize = 1024;
         int numBuffers = 1000;
@@ -122,8 +121,7 @@ public class PartitionedFileWriteReadTest {
         IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
 
         for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
-            assertThat(buffersWritten[subpartition].size())
-                    .isEqualTo(buffersRead[subpartition].size());
+            assertThat(buffersWritten[subpartition]).hasSameSizeAs(buffersRead[subpartition]);
             for (int i = 0; i < buffersWritten[subpartition].size(); ++i) {
                 assertBufferEquals(
                         buffersWritten[subpartition].get(i), buffersRead[subpartition].get(i));
@@ -158,7 +156,7 @@ public class PartitionedFileWriteReadTest {
     }
 
     @Test
-    public void testWriteAndReadWithEmptySubpartition() throws Exception {
+    void testWriteAndReadWithEmptySubpartition() throws Exception {
         int numRegions = 10;
         int numSubpartitions = 5;
         int bufferSize = 1024;
@@ -220,39 +218,50 @@ public class PartitionedFileWriteReadTest {
         return new NetworkBuffer(MemorySegmentFactory.wrap(data), (buf) -> {}, dataType, dataSize);
     }
 
-    @Test(expected = IllegalStateException.class)
+    @Test
     public void testNotWriteDataOfTheSameSubpartitionTogether() throws Exception {
         PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(2);
         try {
             MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024);
 
-            NetworkBuffer buffer1 = new NetworkBuffer(segment, (buf) -> {});
-            partitionedFileWriter.writeBuffers(getBufferWithChannels(buffer1, 1));
+            assertThatThrownBy(
+                            () -> {
+                                NetworkBuffer buffer1 = new NetworkBuffer(segment, (buf) -> {});
+                                partitionedFileWriter.writeBuffers(
+                                        getBufferWithChannels(buffer1, 1));
+
+                                NetworkBuffer buffer2 = new NetworkBuffer(segment, (buf) -> {});
+                                partitionedFileWriter.writeBuffers(
+                                        getBufferWithChannels(buffer2, 0));
 
-            NetworkBuffer buffer2 = new NetworkBuffer(segment, (buf) -> {});
-            partitionedFileWriter.writeBuffers(getBufferWithChannels(buffer2, 0));
+                                NetworkBuffer buffer3 = new NetworkBuffer(segment, (buf) -> {});
+                                partitionedFileWriter.writeBuffers(
+                                        getBufferWithChannels(buffer3, 1));
+                            })
+                    .isInstanceOf(IllegalStateException.class);
 
-            NetworkBuffer buffer3 = new NetworkBuffer(segment, (buf) -> {});
-            partitionedFileWriter.writeBuffers(getBufferWithChannels(buffer3, 1));
         } finally {
             partitionedFileWriter.finish();
         }
     }
 
-    @Test(expected = IllegalStateException.class)
+    @Test
     public void testWriteFinishedPartitionedFile() throws Exception {
         PartitionedFileWriter partitionedFileWriter = createAndFinishPartitionedFileWriter();
 
         MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024);
         NetworkBuffer buffer = new NetworkBuffer(segment, (buf) -> {});
 
-        partitionedFileWriter.writeBuffers(getBufferWithChannels(buffer, 0));
+        assertThatThrownBy(
+                        () -> partitionedFileWriter.writeBuffers(getBufferWithChannels(buffer, 0)))
+                .isInstanceOf(IllegalStateException.class);
     }
 
-    @Test(expected = IllegalStateException.class)
+    @Test
     public void testFinishPartitionedFileWriterTwice() throws Exception {
         PartitionedFileWriter partitionedFileWriter = createAndFinishPartitionedFileWriter();
-        partitionedFileWriter.finish();
+        assertThatThrownBy(() -> partitionedFileWriter.finish())
+                .isInstanceOf(IllegalStateException.class);
     }
 
     @Test
@@ -296,8 +305,7 @@ public class PartitionedFileWriteReadTest {
 
     private PartitionedFileWriter createPartitionedFileWriter(int numSubpartitions)
             throws IOException {
-        String basePath = temporaryFolder.newFile().getPath();
-        return new PartitionedFileWriter(numSubpartitions, 640, basePath);
+        return new PartitionedFileWriter(numSubpartitions, 640, tempPath.toString());
     }
 
     private PartitionedFileWriter createAndFinishPartitionedFileWriter() throws IOException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index 301ecffee1d..26213d4d43d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -23,15 +23,12 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
-import org.apache.flink.util.TestLogger;
 
-import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -46,13 +43,13 @@ import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link SortMergeResultPartitionReadScheduler}. */
-public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
+class SortMergeResultPartitionReadSchedulerTest {
 
     private static final int bufferSize = 1024;
 
@@ -80,17 +77,13 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
 
     private SortMergeResultPartitionReadScheduler readScheduler;
 
-    @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-    @Rule public Timeout timeout = new Timeout(60, TimeUnit.SECONDS);
-
-    @Before
-    public void before() throws Exception {
+    @BeforeEach
+    void before(@TempDir Path basePath) throws Exception {
         Random random = new Random();
         random.nextBytes(dataBytes);
         partitionedFile =
                 PartitionTestUtils.createPartitionedFile(
-                        temporaryFolder.newFile().getAbsolutePath(),
+                        basePath.toString(),
                         numSubpartitions,
                         numBuffersPerSubpartition,
                         bufferSize,
@@ -105,8 +98,8 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
                 new SortMergeResultPartitionReadScheduler(bufferPool, executor, new Object());
     }
 
-    @After
-    public void after() throws Exception {
+    @AfterEach
+    void after() throws Exception {
         dataFileChannel.close();
         indexFileChannel.close();
         partitionedFile.deleteQuietly();
@@ -115,7 +108,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
     }
 
     @Test
-    public void testCreateSubpartitionReader() throws Exception {
+    void testCreateSubpartitionReader() throws Exception {
         SortMergeSubpartitionReader subpartitionReader =
                 readScheduler.createSubpartitionReader(
                         new NoOpBufferAvailablityListener(), 0, partitionedFile);
@@ -141,7 +134,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
     }
 
     @Test
-    public void testOnSubpartitionReaderError() throws Exception {
+    void testOnSubpartitionReaderError() throws Exception {
         SortMergeSubpartitionReader subpartitionReader =
                 readScheduler.createSubpartitionReader(
                         new NoOpBufferAvailablityListener(), 0, partitionedFile);
@@ -152,7 +145,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
     }
 
     @Test
-    public void testReleaseWhileReading() throws Exception {
+    void testReleaseWhileReading() throws Exception {
         SortMergeSubpartitionReader subpartitionReader =
                 readScheduler.createSubpartitionReader(
                         new NoOpBufferAvailablityListener(), 0, partitionedFile);
@@ -169,20 +162,20 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
         assertAllResourcesReleased();
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testCreateSubpartitionReaderAfterReleased() throws Exception {
+    @Test
+    void testCreateSubpartitionReaderAfterReleased() throws Exception {
         bufferPool.initialize();
         readScheduler.release();
-        try {
-            readScheduler.createSubpartitionReader(
-                    new NoOpBufferAvailablityListener(), 0, partitionedFile);
-        } finally {
-            assertAllResourcesReleased();
-        }
+        assertThatThrownBy(
+                        () ->
+                                readScheduler.createSubpartitionReader(
+                                        new NoOpBufferAvailablityListener(), 0, partitionedFile))
+                .isInstanceOf(IllegalStateException.class);
+        assertAllResourcesReleased();
     }
 
     @Test
-    public void testOnDataReadError() throws Exception {
+    void testOnDataReadError() throws Exception {
         SortMergeSubpartitionReader subpartitionReader =
                 readScheduler.createSubpartitionReader(
                         new NoOpBufferAvailablityListener(), 0, partitionedFile);
@@ -205,7 +198,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
     }
 
     @Test
-    public void testOnReadBufferRequestError() throws Exception {
+    void testOnReadBufferRequestError() throws Exception {
         SortMergeSubpartitionReader subpartitionReader =
                 readScheduler.createSubpartitionReader(
                         new NoOpBufferAvailablityListener(), 0, partitionedFile);
@@ -219,8 +212,9 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
         assertAllResourcesReleased();
     }
 
-    @Test(timeout = 60000)
-    public void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception {
+    @Test
+    @Timeout(60)
+    void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception {
         bufferPool.initialize();
         SortMergeSubpartitionReader subpartitionReader =
                 new SortMergeSubpartitionReader(new NoOpBufferAvailablityListener(), fileReader);
@@ -248,7 +242,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
     }
 
     @Test
-    public void testRequestBufferTimeout() throws Exception {
+    void testRequestBufferTimeout() throws Exception {
         Duration bufferRequestTimeout = Duration.ofSeconds(3);
         List<MemorySegment> buffers = bufferPool.requestBuffers();
         SortMergeResultPartitionReadScheduler readScheduler =
@@ -256,8 +250,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
                         bufferPool, executor, this, bufferRequestTimeout);
 
         long startTimestamp = System.nanoTime();
-        Assertions.assertThatThrownBy(readScheduler::allocateBuffers)
-                .isInstanceOf(TimeoutException.class);
+        assertThatThrownBy(readScheduler::allocateBuffers).isInstanceOf(TimeoutException.class);
         long requestDuration = System.nanoTime() - startTimestamp;
         assertThat(requestDuration > bufferRequestTimeout.toNanos()).isTrue();
 
@@ -266,7 +259,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
     }
 
     @Test
-    public void testRequestTimeoutIsRefreshedAndSuccess() throws Exception {
+    void testRequestTimeoutIsRefreshedAndSuccess() throws Exception {
         Duration bufferRequestTimeout = Duration.ofSeconds(3);
         FakeBatchShuffleReadBufferPool bufferPool =
                 new FakeBatchShuffleReadBufferPool(bufferSize * 3, bufferSize);
@@ -280,8 +273,8 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
         Queue<MemorySegment> allocatedBuffers = readScheduler.allocateBuffers();
         long requestDuration = System.nanoTime() - startTimestamp;
 
-        assertThat(allocatedBuffers.size()).isEqualTo(3);
-        assertThat(requestDuration > bufferRequestTimeout.toNanos() * 2).isTrue();
+        assertThat(allocatedBuffers).hasSize(3);
+        assertThat(requestDuration).isGreaterThan(bufferRequestTimeout.toNanos() * 2);
         assertThat(subpartitionReader.getFailureCause()).isNull();
 
         bufferPool.recycle(allocatedBuffers);
@@ -319,7 +312,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
         assertThat(readScheduler.getDataFileChannel()).isNull();
         assertThat(readScheduler.getIndexFileChannel()).isNull();
         assertThat(readScheduler.isRunning()).isFalse();
-        assertThat(readScheduler.getNumPendingReaders()).isEqualTo(0);
+        assertThat(readScheduler.getNumPendingReaders()).isZero();
 
         if (!bufferPool.isDestroyed()) {
             assertThat(bufferPool.getNumTotalBuffers()).isEqualTo(bufferPool.getAvailableBuffers());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
index cf28333de1f..f899a0ea142 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
@@ -24,14 +24,11 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -41,13 +38,13 @@ import java.nio.file.StandardOpenOption;
 import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.Random;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link SortMergeSubpartitionReader}. */
-public class SortMergeSubpartitionReaderTest extends TestLogger {
+class SortMergeSubpartitionReaderTest {
 
     private static final int bufferSize = 1024;
 
@@ -63,17 +60,13 @@ public class SortMergeSubpartitionReaderTest extends TestLogger {
 
     private FileChannel indexFileChannel;
 
-    @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-    @Rule public Timeout timeout = new Timeout(60, TimeUnit.SECONDS);
-
-    @Before
-    public void before() throws Exception {
+    @BeforeEach
+    void before(@TempDir Path basePath) throws Exception {
         Random random = new Random();
         random.nextBytes(dataBytes);
         partitionedFile =
                 PartitionTestUtils.createPartitionedFile(
-                        temporaryFolder.newFile().getAbsolutePath(),
+                        basePath.toString(),
                         numSubpartitions,
                         numBuffersPerSubpartition,
                         bufferSize,
@@ -82,34 +75,34 @@ public class SortMergeSubpartitionReaderTest extends TestLogger {
         indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
     }
 
-    @After
-    public void after() {
+    @AfterEach
+    void after() {
         IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
         partitionedFile.deleteQuietly();
     }
 
     @Test
-    public void testReadBuffers() throws Exception {
+    void testReadBuffers() throws Exception {
         CountingAvailabilityListener listener = new CountingAvailabilityListener();
         SortMergeSubpartitionReader subpartitionReader =
                 createSortMergeSubpartitionReader(listener);
 
-        assertThat(listener.numNotifications).isEqualTo(0);
-        assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
+        assertThat(listener.numNotifications).isZero();
+        assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isZero();
 
         Queue<MemorySegment> segments = createsMemorySegments(2);
         subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE);
 
         assertThat(listener.numNotifications).isEqualTo(1);
         assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(1);
-        assertThat(segments.size()).isEqualTo(0);
+        assertThat(segments).isEmpty();
 
         segments = createsMemorySegments(2);
         subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE);
 
         assertThat(listener.numNotifications).isEqualTo(1);
         assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(2);
-        assertThat(segments.size()).isEqualTo(0);
+        assertThat(segments).isEmpty();
 
         while (subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers() > 0) {
             checkNotNull(subpartitionReader.getNextBuffer()).buffer().recycleBuffer();
@@ -119,13 +112,13 @@ public class SortMergeSubpartitionReaderTest extends TestLogger {
         subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE);
 
         assertThat(listener.numNotifications).isEqualTo(2);
-        assertThat(numBuffersPerSubpartition - 2)
-                .isEqualTo(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
+        assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers())
+                .isEqualTo(numBuffersPerSubpartition - 2);
         assertThat(segments.size()).isEqualTo(1);
     }
 
     @Test
-    public void testPollBuffers() throws Exception {
+    void testPollBuffers() throws Exception {
         SortMergeSubpartitionReader subpartitionReader =
                 createSortMergeSubpartitionReader(new CountingAvailabilityListener());
 
@@ -155,7 +148,7 @@ public class SortMergeSubpartitionReaderTest extends TestLogger {
     }
 
     @Test
-    public void testFail() throws Exception {
+    void testFail() throws Exception {
         int numSegments = 5;
         Queue<MemorySegment> segments = createsMemorySegments(numSegments);
 
@@ -169,20 +162,20 @@ public class SortMergeSubpartitionReaderTest extends TestLogger {
             assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(4);
 
             subpartitionReader.fail(new RuntimeException("Test exception."));
-            assertThat(subpartitionReader.getReleaseFuture().isDone()).isTrue();
-            assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
+            assertThat(subpartitionReader.getReleaseFuture()).isDone();
+            assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isZero();
             assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
             assertThat(subpartitionReader.isReleased()).isTrue();
 
             assertThat(listener.numNotifications).isEqualTo(2);
             assertThat(subpartitionReader.getFailureCause()).isNotNull();
         } finally {
-            assertThat(numSegments).isEqualTo(segments.size());
+            assertThat(segments).hasSize(numSegments);
         }
     }
 
     @Test
-    public void testReleaseAllResources() throws Exception {
+    void testReleaseAllResources() throws Exception {
         int numSegments = 5;
         Queue<MemorySegment> segments = createsMemorySegments(numSegments);
 
@@ -196,20 +189,20 @@ public class SortMergeSubpartitionReaderTest extends TestLogger {
             assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(4);
 
             subpartitionReader.releaseAllResources();
-            assertThat(subpartitionReader.getReleaseFuture().isDone()).isTrue();
-            assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
+            assertThat(subpartitionReader.getReleaseFuture()).isDone();
+            assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isZero();
             assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
             assertThat(subpartitionReader.isReleased()).isTrue();
 
             assertThat(listener.numNotifications).isEqualTo(1);
             assertThat(subpartitionReader.getFailureCause()).isNull();
         } finally {
-            assertThat(numSegments).isEqualTo(segments.size());
+            assertThat(segments).hasSize(numSegments);
         }
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testReadBuffersAfterReleased() throws Exception {
+    @Test
+    void testReadBuffersAfterReleased() throws Exception {
         int numSegments = 5;
         Queue<MemorySegment> segments = createsMemorySegments(numSegments);
 
@@ -219,14 +212,15 @@ public class SortMergeSubpartitionReaderTest extends TestLogger {
 
             subpartitionReader.readBuffers(segments, segments::add);
             subpartitionReader.releaseAllResources();
-            subpartitionReader.readBuffers(segments, segments::add);
+            assertThatThrownBy(() -> subpartitionReader.readBuffers(segments, segments::add))
+                    .isInstanceOf(IllegalStateException.class);
         } finally {
-            assertThat(numSegments).isEqualTo(segments.size());
+            assertThat(segments).hasSize(numSegments);
         }
     }
 
     @Test
-    public void testPollBuffersAfterReleased() throws Exception {
+    void testPollBuffersAfterReleased() throws Exception {
         SortMergeSubpartitionReader subpartitionReader =
                 createSortMergeSubpartitionReader(new CountingAvailabilityListener());