You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/08/09 15:46:52 UTC
[flink] 01/02: [hotfix] Migrate PartitionFileWriteReadTest, SortMergeResultPartitionReadSchedulerTest, SortMergeSubpartitionReaderTest to Junit5 and AssertJ
This is an automated email from the ASF dual-hosted git repository.
yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e92b87df73897843a136bd273a38e90be8ab2d68
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Aug 3 01:38:35 2022 +0800
[hotfix] Migrate PartitionFileWriteReadTest, SortMergeResultPartitionReadSchedulerTest, SortMergeSubpartitionReaderTest to Junit5 and AssertJ
This closes #20333.
---
.../partition/PartitionedFileWriteReadTest.java | 54 +++++++++-------
.../SortMergeResultPartitionReadSchedulerTest.java | 75 ++++++++++------------
.../partition/SortMergeSubpartitionReaderTest.java | 72 ++++++++++-----------
3 files changed, 98 insertions(+), 103 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
index adfe71792d5..741fd3f1882 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
@@ -26,9 +26,8 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.util.IOUtils;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.nio.channels.FileChannel;
@@ -44,17 +43,17 @@ import java.util.Random;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/**
* Tests for writing and reading {@link PartitionedFile} with {@link PartitionedFileWriter} and
* {@link PartitionedFileReader}.
*/
-public class PartitionedFileWriteReadTest {
-
- @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+class PartitionedFileWriteReadTest {
+ private @TempDir Path tempPath;
@Test
- public void testWriteAndReadPartitionedFile() throws Exception {
+ void testWriteAndReadPartitionedFile() throws Exception {
int numSubpartitions = 10;
int bufferSize = 1024;
int numBuffers = 1000;
@@ -122,8 +121,7 @@ public class PartitionedFileWriteReadTest {
IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
for (int subpartition = 0; subpartition < numSubpartitions; ++subpartition) {
- assertThat(buffersWritten[subpartition].size())
- .isEqualTo(buffersRead[subpartition].size());
+ assertThat(buffersWritten[subpartition]).hasSameSizeAs(buffersRead[subpartition]);
for (int i = 0; i < buffersWritten[subpartition].size(); ++i) {
assertBufferEquals(
buffersWritten[subpartition].get(i), buffersRead[subpartition].get(i));
@@ -158,7 +156,7 @@ public class PartitionedFileWriteReadTest {
}
@Test
- public void testWriteAndReadWithEmptySubpartition() throws Exception {
+ void testWriteAndReadWithEmptySubpartition() throws Exception {
int numRegions = 10;
int numSubpartitions = 5;
int bufferSize = 1024;
@@ -220,39 +218,50 @@ public class PartitionedFileWriteReadTest {
return new NetworkBuffer(MemorySegmentFactory.wrap(data), (buf) -> {}, dataType, dataSize);
}
- @Test(expected = IllegalStateException.class)
+ @Test
public void testNotWriteDataOfTheSameSubpartitionTogether() throws Exception {
PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(2);
try {
MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024);
- NetworkBuffer buffer1 = new NetworkBuffer(segment, (buf) -> {});
- partitionedFileWriter.writeBuffers(getBufferWithChannels(buffer1, 1));
+ assertThatThrownBy(
+ () -> {
+ NetworkBuffer buffer1 = new NetworkBuffer(segment, (buf) -> {});
+ partitionedFileWriter.writeBuffers(
+ getBufferWithChannels(buffer1, 1));
+
+ NetworkBuffer buffer2 = new NetworkBuffer(segment, (buf) -> {});
+ partitionedFileWriter.writeBuffers(
+ getBufferWithChannels(buffer2, 0));
- NetworkBuffer buffer2 = new NetworkBuffer(segment, (buf) -> {});
- partitionedFileWriter.writeBuffers(getBufferWithChannels(buffer2, 0));
+ NetworkBuffer buffer3 = new NetworkBuffer(segment, (buf) -> {});
+ partitionedFileWriter.writeBuffers(
+ getBufferWithChannels(buffer3, 1));
+ })
+ .isInstanceOf(IllegalStateException.class);
- NetworkBuffer buffer3 = new NetworkBuffer(segment, (buf) -> {});
- partitionedFileWriter.writeBuffers(getBufferWithChannels(buffer3, 1));
} finally {
partitionedFileWriter.finish();
}
}
- @Test(expected = IllegalStateException.class)
+ @Test
public void testWriteFinishedPartitionedFile() throws Exception {
PartitionedFileWriter partitionedFileWriter = createAndFinishPartitionedFileWriter();
MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024);
NetworkBuffer buffer = new NetworkBuffer(segment, (buf) -> {});
- partitionedFileWriter.writeBuffers(getBufferWithChannels(buffer, 0));
+ assertThatThrownBy(
+ () -> partitionedFileWriter.writeBuffers(getBufferWithChannels(buffer, 0)))
+ .isInstanceOf(IllegalStateException.class);
}
- @Test(expected = IllegalStateException.class)
+ @Test
public void testFinishPartitionedFileWriterTwice() throws Exception {
PartitionedFileWriter partitionedFileWriter = createAndFinishPartitionedFileWriter();
- partitionedFileWriter.finish();
+ assertThatThrownBy(() -> partitionedFileWriter.finish())
+ .isInstanceOf(IllegalStateException.class);
}
@Test
@@ -296,8 +305,7 @@ public class PartitionedFileWriteReadTest {
private PartitionedFileWriter createPartitionedFileWriter(int numSubpartitions)
throws IOException {
- String basePath = temporaryFolder.newFile().getPath();
- return new PartitionedFileWriter(numSubpartitions, 640, basePath);
+ return new PartitionedFileWriter(numSubpartitions, 640, tempPath.toString());
}
private PartitionedFileWriter createAndFinishPartitionedFileWriter() throws IOException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index 301ecffee1d..26213d4d43d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -23,15 +23,12 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
-import org.apache.flink.util.TestLogger;
-import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -46,13 +43,13 @@ import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link SortMergeResultPartitionReadScheduler}. */
-public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
+class SortMergeResultPartitionReadSchedulerTest {
private static final int bufferSize = 1024;
@@ -80,17 +77,13 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
private SortMergeResultPartitionReadScheduler readScheduler;
- @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- @Rule public Timeout timeout = new Timeout(60, TimeUnit.SECONDS);
-
- @Before
- public void before() throws Exception {
+ @BeforeEach
+ void before(@TempDir Path basePath) throws Exception {
Random random = new Random();
random.nextBytes(dataBytes);
partitionedFile =
PartitionTestUtils.createPartitionedFile(
- temporaryFolder.newFile().getAbsolutePath(),
+ basePath.toString(),
numSubpartitions,
numBuffersPerSubpartition,
bufferSize,
@@ -105,8 +98,8 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
new SortMergeResultPartitionReadScheduler(bufferPool, executor, new Object());
}
- @After
- public void after() throws Exception {
+ @AfterEach
+ void after() throws Exception {
dataFileChannel.close();
indexFileChannel.close();
partitionedFile.deleteQuietly();
@@ -115,7 +108,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
}
@Test
- public void testCreateSubpartitionReader() throws Exception {
+ void testCreateSubpartitionReader() throws Exception {
SortMergeSubpartitionReader subpartitionReader =
readScheduler.createSubpartitionReader(
new NoOpBufferAvailablityListener(), 0, partitionedFile);
@@ -141,7 +134,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
}
@Test
- public void testOnSubpartitionReaderError() throws Exception {
+ void testOnSubpartitionReaderError() throws Exception {
SortMergeSubpartitionReader subpartitionReader =
readScheduler.createSubpartitionReader(
new NoOpBufferAvailablityListener(), 0, partitionedFile);
@@ -152,7 +145,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
}
@Test
- public void testReleaseWhileReading() throws Exception {
+ void testReleaseWhileReading() throws Exception {
SortMergeSubpartitionReader subpartitionReader =
readScheduler.createSubpartitionReader(
new NoOpBufferAvailablityListener(), 0, partitionedFile);
@@ -169,20 +162,20 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
assertAllResourcesReleased();
}
- @Test(expected = IllegalStateException.class)
- public void testCreateSubpartitionReaderAfterReleased() throws Exception {
+ @Test
+ void testCreateSubpartitionReaderAfterReleased() throws Exception {
bufferPool.initialize();
readScheduler.release();
- try {
- readScheduler.createSubpartitionReader(
- new NoOpBufferAvailablityListener(), 0, partitionedFile);
- } finally {
- assertAllResourcesReleased();
- }
+ assertThatThrownBy(
+ () ->
+ readScheduler.createSubpartitionReader(
+ new NoOpBufferAvailablityListener(), 0, partitionedFile))
+ .isInstanceOf(IllegalStateException.class);
+ assertAllResourcesReleased();
}
@Test
- public void testOnDataReadError() throws Exception {
+ void testOnDataReadError() throws Exception {
SortMergeSubpartitionReader subpartitionReader =
readScheduler.createSubpartitionReader(
new NoOpBufferAvailablityListener(), 0, partitionedFile);
@@ -205,7 +198,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
}
@Test
- public void testOnReadBufferRequestError() throws Exception {
+ void testOnReadBufferRequestError() throws Exception {
SortMergeSubpartitionReader subpartitionReader =
readScheduler.createSubpartitionReader(
new NoOpBufferAvailablityListener(), 0, partitionedFile);
@@ -219,8 +212,9 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
assertAllResourcesReleased();
}
- @Test(timeout = 60000)
- public void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception {
+ @Test
+ @Timeout(60)
+ void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception {
bufferPool.initialize();
SortMergeSubpartitionReader subpartitionReader =
new SortMergeSubpartitionReader(new NoOpBufferAvailablityListener(), fileReader);
@@ -248,7 +242,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
}
@Test
- public void testRequestBufferTimeout() throws Exception {
+ void testRequestBufferTimeout() throws Exception {
Duration bufferRequestTimeout = Duration.ofSeconds(3);
List<MemorySegment> buffers = bufferPool.requestBuffers();
SortMergeResultPartitionReadScheduler readScheduler =
@@ -256,8 +250,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
bufferPool, executor, this, bufferRequestTimeout);
long startTimestamp = System.nanoTime();
- Assertions.assertThatThrownBy(readScheduler::allocateBuffers)
- .isInstanceOf(TimeoutException.class);
+ assertThatThrownBy(readScheduler::allocateBuffers).isInstanceOf(TimeoutException.class);
long requestDuration = System.nanoTime() - startTimestamp;
assertThat(requestDuration > bufferRequestTimeout.toNanos()).isTrue();
@@ -266,7 +259,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
}
@Test
- public void testRequestTimeoutIsRefreshedAndSuccess() throws Exception {
+ void testRequestTimeoutIsRefreshedAndSuccess() throws Exception {
Duration bufferRequestTimeout = Duration.ofSeconds(3);
FakeBatchShuffleReadBufferPool bufferPool =
new FakeBatchShuffleReadBufferPool(bufferSize * 3, bufferSize);
@@ -280,8 +273,8 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
Queue<MemorySegment> allocatedBuffers = readScheduler.allocateBuffers();
long requestDuration = System.nanoTime() - startTimestamp;
- assertThat(allocatedBuffers.size()).isEqualTo(3);
- assertThat(requestDuration > bufferRequestTimeout.toNanos() * 2).isTrue();
+ assertThat(allocatedBuffers).hasSize(3);
+ assertThat(requestDuration).isGreaterThan(bufferRequestTimeout.toNanos() * 2);
assertThat(subpartitionReader.getFailureCause()).isNull();
bufferPool.recycle(allocatedBuffers);
@@ -319,7 +312,7 @@ public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
assertThat(readScheduler.getDataFileChannel()).isNull();
assertThat(readScheduler.getIndexFileChannel()).isNull();
assertThat(readScheduler.isRunning()).isFalse();
- assertThat(readScheduler.getNumPendingReaders()).isEqualTo(0);
+ assertThat(readScheduler.getNumPendingReaders()).isZero();
if (!bufferPool.isDestroyed()) {
assertThat(bufferPool.getNumTotalBuffers()).isEqualTo(bufferPool.getAvailableBuffers());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
index cf28333de1f..f899a0ea142 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
@@ -24,14 +24,11 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.TestLogger;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -41,13 +38,13 @@ import java.nio.file.StandardOpenOption;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Random;
-import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link SortMergeSubpartitionReader}. */
-public class SortMergeSubpartitionReaderTest extends TestLogger {
+class SortMergeSubpartitionReaderTest {
private static final int bufferSize = 1024;
@@ -63,17 +60,13 @@ public class SortMergeSubpartitionReaderTest extends TestLogger {
private FileChannel indexFileChannel;
- @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- @Rule public Timeout timeout = new Timeout(60, TimeUnit.SECONDS);
-
- @Before
- public void before() throws Exception {
+ @BeforeEach
+ void before(@TempDir Path basePath) throws Exception {
Random random = new Random();
random.nextBytes(dataBytes);
partitionedFile =
PartitionTestUtils.createPartitionedFile(
- temporaryFolder.newFile().getAbsolutePath(),
+ basePath.toString(),
numSubpartitions,
numBuffersPerSubpartition,
bufferSize,
@@ -82,34 +75,34 @@ public class SortMergeSubpartitionReaderTest extends TestLogger {
indexFileChannel = openFileChannel(partitionedFile.getIndexFilePath());
}
- @After
- public void after() {
+ @AfterEach
+ void after() {
IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
partitionedFile.deleteQuietly();
}
@Test
- public void testReadBuffers() throws Exception {
+ void testReadBuffers() throws Exception {
CountingAvailabilityListener listener = new CountingAvailabilityListener();
SortMergeSubpartitionReader subpartitionReader =
createSortMergeSubpartitionReader(listener);
- assertThat(listener.numNotifications).isEqualTo(0);
- assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
+ assertThat(listener.numNotifications).isZero();
+ assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isZero();
Queue<MemorySegment> segments = createsMemorySegments(2);
subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE);
assertThat(listener.numNotifications).isEqualTo(1);
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(1);
- assertThat(segments.size()).isEqualTo(0);
+ assertThat(segments).isEmpty();
segments = createsMemorySegments(2);
subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE);
assertThat(listener.numNotifications).isEqualTo(1);
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(2);
- assertThat(segments.size()).isEqualTo(0);
+ assertThat(segments).isEmpty();
while (subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers() > 0) {
checkNotNull(subpartitionReader.getNextBuffer()).buffer().recycleBuffer();
@@ -119,13 +112,13 @@ public class SortMergeSubpartitionReaderTest extends TestLogger {
subpartitionReader.readBuffers(segments, FreeingBufferRecycler.INSTANCE);
assertThat(listener.numNotifications).isEqualTo(2);
- assertThat(numBuffersPerSubpartition - 2)
- .isEqualTo(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
+ assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers())
+ .isEqualTo(numBuffersPerSubpartition - 2);
assertThat(segments.size()).isEqualTo(1);
}
@Test
- public void testPollBuffers() throws Exception {
+ void testPollBuffers() throws Exception {
SortMergeSubpartitionReader subpartitionReader =
createSortMergeSubpartitionReader(new CountingAvailabilityListener());
@@ -155,7 +148,7 @@ public class SortMergeSubpartitionReaderTest extends TestLogger {
}
@Test
- public void testFail() throws Exception {
+ void testFail() throws Exception {
int numSegments = 5;
Queue<MemorySegment> segments = createsMemorySegments(numSegments);
@@ -169,20 +162,20 @@ public class SortMergeSubpartitionReaderTest extends TestLogger {
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(4);
subpartitionReader.fail(new RuntimeException("Test exception."));
- assertThat(subpartitionReader.getReleaseFuture().isDone()).isTrue();
- assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
+ assertThat(subpartitionReader.getReleaseFuture()).isDone();
+ assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isZero();
assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
assertThat(subpartitionReader.isReleased()).isTrue();
assertThat(listener.numNotifications).isEqualTo(2);
assertThat(subpartitionReader.getFailureCause()).isNotNull();
} finally {
- assertThat(numSegments).isEqualTo(segments.size());
+ assertThat(segments).hasSize(numSegments);
}
}
@Test
- public void testReleaseAllResources() throws Exception {
+ void testReleaseAllResources() throws Exception {
int numSegments = 5;
Queue<MemorySegment> segments = createsMemorySegments(numSegments);
@@ -196,20 +189,20 @@ public class SortMergeSubpartitionReaderTest extends TestLogger {
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(4);
subpartitionReader.releaseAllResources();
- assertThat(subpartitionReader.getReleaseFuture().isDone()).isTrue();
- assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
+ assertThat(subpartitionReader.getReleaseFuture()).isDone();
+ assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isZero();
assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
assertThat(subpartitionReader.isReleased()).isTrue();
assertThat(listener.numNotifications).isEqualTo(1);
assertThat(subpartitionReader.getFailureCause()).isNull();
} finally {
- assertThat(numSegments).isEqualTo(segments.size());
+ assertThat(segments).hasSize(numSegments);
}
}
- @Test(expected = IllegalStateException.class)
- public void testReadBuffersAfterReleased() throws Exception {
+ @Test
+ void testReadBuffersAfterReleased() throws Exception {
int numSegments = 5;
Queue<MemorySegment> segments = createsMemorySegments(numSegments);
@@ -219,14 +212,15 @@ public class SortMergeSubpartitionReaderTest extends TestLogger {
subpartitionReader.readBuffers(segments, segments::add);
subpartitionReader.releaseAllResources();
- subpartitionReader.readBuffers(segments, segments::add);
+ assertThatThrownBy(() -> subpartitionReader.readBuffers(segments, segments::add))
+ .isInstanceOf(IllegalStateException.class);
} finally {
- assertThat(numSegments).isEqualTo(segments.size());
+ assertThat(segments).hasSize(numSegments);
}
}
@Test
- public void testPollBuffersAfterReleased() throws Exception {
+ void testPollBuffersAfterReleased() throws Exception {
SortMergeSubpartitionReader subpartitionReader =
createSortMergeSubpartitionReader(new CountingAvailabilityListener());