You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2022/03/21 13:36:30 UTC

[flink] 04/04: [FLINK-26420][test] migrate AsyncSinkWriterTest to AssertJ.

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 71ee033865a3adc3247c47d8636b16e99b924470
Author: Jing Ge <ge...@gmail.com>
AuthorDate: Tue Mar 15 18:38:56 2022 +0100

    [FLINK-26420][test] migrate AsyncSinkWriterTest to AssertJ.
    
    (cherry picked from commit a2df2665b6ff411a2aeb9b204fd9d46a2af0ecfa)
---
 .../base/sink/writer/AsyncSinkWriterTest.java      | 224 ++++++++++++---------
 1 file changed, 130 insertions(+), 94 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index c6e95c7..47c437e 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -21,8 +21,8 @@ import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -41,11 +41,8 @@ import java.util.stream.Collectors;
 import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 
 /**
  * Unit Tests the functionality of AsyncSinkWriter without any assumptions of what a concrete
@@ -57,7 +54,7 @@ public class AsyncSinkWriterTest {
     private TestSinkInitContext sinkInitContext;
     private TestSinkInitContextAnyThreadMailbox sinkInitContextAnyThreadMailbox;
 
-    @Before
+    @BeforeEach
     public void before() {
         res.clear();
         sinkInitContext = new TestSinkInitContext();
@@ -77,17 +74,20 @@ public class AsyncSinkWriterTest {
     public void testNumberOfRecordsIsAMultipleOfBatchSizeResultsInThatNumberOfRecordsBeingWritten()
             throws IOException, InterruptedException {
         performNormalWriteOfEightyRecordsToMock();
-        assertEquals(80, res.size());
+
+        assertThat(res.size()).isEqualTo(80);
     }
 
     @Test
     public void testMetricsGroupHasLoggedNumberOfRecordsAndNumberOfBytesCorrectly()
             throws IOException, InterruptedException {
         performNormalWriteOfEightyRecordsToMock();
-        assertEquals(80, sinkInitContext.getNumRecordsOutCounter().getCount());
-        assertEquals(320, sinkInitContext.getNumBytesOutCounter().getCount());
-        assertTrue(sinkInitContext.getCurrentSendTimeGauge().get().getValue() >= 0);
-        assertTrue(sinkInitContext.getCurrentSendTimeGauge().get().getValue() < 1000);
+
+        assertThat(sinkInitContext.getNumRecordsOutCounter().getCount()).isEqualTo(80);
+        assertThat(sinkInitContext.getNumBytesOutCounter().getCount()).isEqualTo(320);
+        assertThat(sinkInitContext.getCurrentSendTimeGauge().get().getValue())
+                .isGreaterThanOrEqualTo(0);
+        assertThat(sinkInitContext.getCurrentSendTimeGauge().get().getValue()).isLessThan(1000);
     }
 
     @Test
@@ -101,8 +101,10 @@ public class AsyncSinkWriterTest {
         for (int i = 0; i < 4; i++) {
             sink.write(String.valueOf(i));
         }
-        assertTrue(sinkInitContext.getCurrentSendTimeGauge().get().getValue() >= 99);
-        assertTrue(sinkInitContext.getCurrentSendTimeGauge().get().getValue() < 110);
+
+        assertThat(sinkInitContext.getCurrentSendTimeGauge().get().getValue())
+                .isGreaterThanOrEqualTo(99);
+        assertThat(sinkInitContext.getCurrentSendTimeGauge().get().getValue()).isLessThan(110);
     }
 
     @Test
@@ -113,7 +115,8 @@ public class AsyncSinkWriterTest {
         for (int i = 0; i < 23; i++) {
             sink.write(String.valueOf(i));
         }
-        assertEquals(20, res.size());
+
+        assertThat(res.size()).isEqualTo(20);
         assertThatBufferStatesAreEqual(sink.wrapRequests(20, 21, 22), getWriterState(sink));
     }
 
@@ -129,7 +132,8 @@ public class AsyncSinkWriterTest {
         sink.write("1"); // 4 bytes per record
         sink.write("2"); // to give 12 bytes in final flush
         sink.write("3");
-        assertEquals(3, res.size());
+
+        assertThat(res.size()).isEqualTo(3);
     }
 
     @Test
@@ -141,7 +145,8 @@ public class AsyncSinkWriterTest {
             sink.write(String.valueOf(i));
         }
         sink.flush(true);
-        assertEquals(23, res.size());
+
+        assertThat(res.size()).isEqualTo(23);
     }
 
     @Test
@@ -151,7 +156,8 @@ public class AsyncSinkWriterTest {
                 new AsyncSinkWriterImplBuilder().context(sinkInitContext).build();
         sink.write(String.valueOf(0));
         sink.flush(true);
-        assertEquals(1, res.size());
+
+        assertThat(res.size()).isEqualTo(1);
     }
 
     @Test
@@ -162,12 +168,14 @@ public class AsyncSinkWriterTest {
 
         sink.write("25");
         sink.write("55");
+
         assertThatBufferStatesAreEqual(sink.wrapRequests(25, 55), getWriterState(sink));
-        assertEquals(0, res.size());
+        assertThat(res.size()).isEqualTo(0);
 
         sink.write("75");
+
         assertThatBufferStatesAreEqual(BufferedRequestState.emptyState(), getWriterState(sink));
-        assertEquals(3, res.size());
+        assertThat(res.size()).isEqualTo(3);
     }
 
     public void writeFiveRecordsWithOneFailingThenCallPrepareCommitWithFlushing()
@@ -180,6 +188,7 @@ public class AsyncSinkWriterTest {
         sink.write("75");
         sink.write("95");
         sink.write("955");
+
         assertThatBufferStatesAreEqual(sink.wrapRequests(95, 955), getWriterState(sink));
         sink.flush(true);
         assertThatBufferStatesAreEqual(BufferedRequestState.emptyState(), getWriterState(sink));
@@ -189,15 +198,17 @@ public class AsyncSinkWriterTest {
     public void testThatSnapshotsAreTakenOfBufferCorrectlyBeforeAndAfterManualFlush()
             throws IOException, InterruptedException {
         writeFiveRecordsWithOneFailingThenCallPrepareCommitWithFlushing();
-        assertEquals(5, res.size());
+
+        assertThat(res.size()).isEqualTo(5);
     }
 
     @Test
     public void metricsAreLoggedEachTimeSubmitRequestEntriesIsCalled()
             throws IOException, InterruptedException {
         writeFiveRecordsWithOneFailingThenCallPrepareCommitWithFlushing();
-        assertEquals(5, sinkInitContext.getNumRecordsOutCounter().getCount());
-        assertEquals(20, sinkInitContext.getNumBytesOutCounter().getCount());
+
+        assertThat(sinkInitContext.getNumRecordsOutCounter().getCount()).isEqualTo(5);
+        assertThat(sinkInitContext.getNumBytesOutCounter().getCount()).isEqualTo(20);
     }
 
     @Test
@@ -215,11 +226,11 @@ public class AsyncSinkWriterTest {
         sink.write("75");
         sink.write("95");
         sink.write("35");
-        Exception e = assertThrows(RuntimeException.class, () -> sink.write("135"));
-        assertEquals(
-                "Deliberate runtime exception occurred in SinkWriterImplementation.",
-                e.getMessage());
-        assertEquals(3, res.size());
+
+        assertThatThrownBy(() -> sink.write("135"))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessage("Deliberate runtime exception occurred in SinkWriterImplementation.");
+        assertThat(res.size()).isEqualTo(3);
     }
 
     @Test
@@ -292,8 +303,8 @@ public class AsyncSinkWriterTest {
         sink.flush(true);
 
         // Everything is saved
-        assertEquals(Arrays.asList(25, 55, 965, 75, 95, 955, 550, 45, 35, 535), res);
-        assertEquals(0, getWriterState(sink).getStateSize());
+        assertThat(res).isEqualTo(Arrays.asList(25, 55, 965, 75, 95, 955, 550, 45, 35, 535));
+        assertThat(getWriterState(sink).getStateSize()).isEqualTo(0);
     }
 
     @Test
@@ -342,40 +353,39 @@ public class AsyncSinkWriterTest {
         // Buffer continues to fill up without blocking on write, until eventually yield is called
         // on the mailbox thread during the prepare commit
         sink.flush(true);
-        assertEquals(Arrays.asList(25, 55, 965, 75, 95, 955, 550, 645, 545, 535, 515, 505), res);
+
+        assertThat(res)
+                .isEqualTo(Arrays.asList(25, 55, 965, 75, 95, 955, 550, 645, 545, 535, 515, 505));
     }
 
     @Test
     public void testThatMaxBufferSizeOfSinkShouldBeStrictlyGreaterThanMaxSizeOfEachBatch() {
-        Exception e =
-                assertThrows(
-                        IllegalArgumentException.class,
+        assertThatThrownBy(
                         () ->
                                 new AsyncSinkWriterImplBuilder()
                                         .context(sinkInitContext)
                                         .maxBufferedRequests(10)
-                                        .build());
-        assertEquals(
-                "The maximum number of requests that may be buffered should be "
-                        + "strictly greater than the maximum number of requests per batch.",
-                e.getMessage());
+                                        .build())
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "The maximum number of requests that may be buffered should be "
+                                + "strictly greater than the maximum number of requests per batch.");
     }
 
     @Test
     public void maxRecordSizeSetMustBeSmallerThanOrEqualToMaxBatchSize() {
-        Exception e =
-                assertThrows(
-                        IllegalArgumentException.class,
+        assertThatThrownBy(
                         () ->
                                 new AsyncSinkWriterImplBuilder()
                                         .context(sinkInitContext)
                                         .maxBufferedRequests(11)
                                         .maxBatchSizeInBytes(10_000)
                                         .maxRecordSizeInBytes(10_001)
-                                        .build());
-        assertEquals(
-                "The maximum allowed size in bytes per flush must be greater than or equal to the maximum allowed size in bytes of a single record.",
-                e.getMessage());
+                                        .build())
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        "The maximum allowed size in bytes per flush must be greater than or equal to"
+                                + " the maximum allowed size in bytes of a single record.");
     }
 
     @Test
@@ -388,17 +398,20 @@ public class AsyncSinkWriterTest {
                         .maxBatchSizeInBytes(10_000)
                         .maxRecordSizeInBytes(3)
                         .build();
-        Exception e = assertThrows(IllegalArgumentException.class, () -> sink.write("3"));
-        assertEquals(
-                "The request entry sent to the buffer was of size [4], when the maxRecordSizeInBytes was set to [3].",
-                e.getMessage());
+
+        assertThatThrownBy(() -> sink.write("3"))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "The request entry sent to the buffer was of size [4], when "
+                                + "the maxRecordSizeInBytes was set to [3].");
     }
 
     private void writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
             AsyncSinkWriterImpl sink, String x, List<Integer> y, List<Integer> z)
             throws IOException, InterruptedException {
         sink.write(x);
-        assertEquals(y, res);
+
+        assertThat(res).isEqualTo(y);
         assertThatBufferStatesAreEqual(sink.wrapRequests(z), getWriterState(sink));
     }
 
@@ -418,7 +431,8 @@ public class AsyncSinkWriterTest {
          */
         for (int i = 0; i < 100; i++) {
             sink.write(String.valueOf(i));
-            assertEquals((i / 7) * 7, res.size());
+
+            assertThat(res.size()).isEqualTo((i / 7) * 7);
         }
     }
 
@@ -430,7 +444,8 @@ public class AsyncSinkWriterTest {
         sink.write(String.valueOf(1));
         sink.write(String.valueOf(2));
         sink.flush(false);
-        assertEquals(0, res.size());
+
+        assertThat(res.size()).isEqualTo(0);
     }
 
     @Test
@@ -447,11 +462,14 @@ public class AsyncSinkWriterTest {
         for (int i = 0; i < 7; i++) {
             sink.write(String.valueOf(i));
         }
-        assertEquals(7, res.size());
+
+        assertThat(res.size()).isEqualTo(7);
+
         for (int i = 7; i < 14; i++) {
             sink.write(String.valueOf(i));
         }
-        assertEquals(14, res.size());
+
+        assertThat(res.size()).isEqualTo(14);
     }
 
     @Test
@@ -468,13 +486,17 @@ public class AsyncSinkWriterTest {
         sink.write(String.valueOf(1)); //   Buffer: 104/110B; 2/10 elements; 0 inflight
         sink.write(String.valueOf(2)); //   Buffer: 108/110B; 3/10 elements; 0 inflight
         sink.write(String.valueOf(3)); //   Buffer: 112/110B; 4/10 elements; 0 inflight -- flushing
-        assertEquals(2, res.size()); // Request was [225, 1, 2], element 225 failed on first attempt
+
+        assertThat(res.size())
+                .isEqualTo(2); // Request was [225, 1, 2], element 225 failed on first attempt
+
         sink.write(String.valueOf(4)); //   Buffer:   8/110B; 2/10 elements; 1 inflight
         sink.write(String.valueOf(5)); //   Buffer:  12/110B; 3/10 elements; 1 inflight
         sink.write(String.valueOf(6)); //   Buffer:  16/110B; 4/10 elements; 1 inflight
         sink.write(String.valueOf(325)); // Buffer: 116/110B; 5/10 elements; 1 inflight -- flushing
+
         // inflight request is processed, buffer: [225, 3, 4, 5, 6, 325]
-        assertEquals(Arrays.asList(1, 2, 225, 3, 4), res);
+        assertThat(res).isEqualTo(Arrays.asList(1, 2, 225, 3, 4));
         // Buffer: [5, 6, 325]; 0 inflight
     }
 
@@ -494,14 +516,18 @@ public class AsyncSinkWriterTest {
         sink.write(String.valueOf(1)); //   Buffer: 204/210B; 3/10 elements; 0 inflight
         sink.write(String.valueOf(2)); //   Buffer: 208/210B; 4/10 elements; 0 inflight
         sink.write(String.valueOf(3)); //   Buffer: 212/210B; 5/10 elements; 0 inflight -- flushing
-        assertEquals(2, res.size()); // Request was [228, 225, 1, 2], element 228, 225 failed
+
+        assertThat(res.size())
+                .isEqualTo(2); // Request was [228, 225, 1, 2], element 228, 225 failed
+
         sink.write(String.valueOf(4)); //   Buffer:   8/210B; 2/10 elements; 2 inflight
         sink.write(String.valueOf(5)); //   Buffer:  12/210B; 3/10 elements; 2 inflight
         sink.write(String.valueOf(6)); //   Buffer:  16/210B; 4/10 elements; 2 inflight
         sink.write(String.valueOf(328)); // Buffer: 116/210B; 5/10 elements; 2 inflight
         sink.write(String.valueOf(325)); // Buffer: 216/210B; 6/10 elements; 2 inflight -- flushing
+
         // inflight request is processed, buffer: [228, 225, 3, 4, 5, 6, 328, 325]
-        assertEquals(Arrays.asList(1, 2, 228, 225, 3, 4), res);
+        assertThat(res).isEqualTo(Arrays.asList(1, 2, 228, 225, 3, 4));
         // Buffer: [5, 6, 328, 325]; 0 inflight
     }
 
@@ -526,9 +552,12 @@ public class AsyncSinkWriterTest {
         }
 
         tpts.setCurrentTime(99L);
-        assertEquals(0, res.size());
+
+        assertThat(res.size()).isEqualTo(0);
+
         tpts.setCurrentTime(100L);
-        assertEquals(8, res.size());
+
+        assertThat(res.size()).isEqualTo(8);
     }
 
     @Test
@@ -551,9 +580,9 @@ public class AsyncSinkWriterTest {
             sink.write(String.valueOf(i));
         }
         tpts.setCurrentTime(99L);
-        assertEquals(90, res.size());
+        assertThat(res.size()).isEqualTo(90);
         tpts.setCurrentTime(100L);
-        assertEquals(98, res.size());
+        assertThat(res.size()).isEqualTo(98);
     }
 
     @Test
@@ -571,15 +600,17 @@ public class AsyncSinkWriterTest {
         sink.write(String.valueOf(1)); // buffer: [225, 0, 1]
         sink.write(String.valueOf(2)); // buffer: [2], inflight: [225], destination: [0, 1]
 
-        assertEquals(Arrays.asList(0, 1), res);
+        assertThat(res).isEqualTo(Arrays.asList(0, 1));
         assertThatBufferStatesAreEqual(sink.wrapRequests(2), getWriterState(sink));
 
         sink.flush(false); // buffer: [225, 2], inflight: [], destination: [0, 1]
-        assertEquals(Arrays.asList(0, 1), res);
+
+        assertThat(res).isEqualTo(Arrays.asList(0, 1));
         assertThatBufferStatesAreEqual(sink.wrapRequests(225, 2), getWriterState(sink));
 
         sink.flush(true); // buffer: [], inflight: [], destination: [0, 1, 225, 2]
-        assertEquals(Arrays.asList(0, 1, 225, 2), res);
+
+        assertThat(res).isEqualTo(Arrays.asList(0, 1, 225, 2));
     }
 
     @Test
@@ -596,11 +627,14 @@ public class AsyncSinkWriterTest {
         sink.write(String.valueOf(1)); //   Buffer: 104/110B; 3/10 elements; 0 inflight
         sink.write(String.valueOf(2)); //   Buffer: 108/110B; 4/10 elements; 0 inflight
         sink.write(String.valueOf(3)); //   Buffer: 112/110B; 5/10 elements; 0 inflight -- flushing
-        assertEquals(2, res.size()); // Request was [225, 1, 2], element 225 failed
+
+        assertThat(res.size()).isEqualTo(2); // Request was [225, 1, 2], element 225 failed
 
         // buffer should be [3] with [225] inflight
         sink.flush(false); // Buffer: [225,3] - > 8/110; 2/10 elements; 0 inflight
-        assertEquals(2, res.size()); //
+
+        assertThat(res.size()).isEqualTo(2); //
+
         List<BufferedRequestState<Integer>> states = sink.snapshotState(1);
         AsyncSinkWriterImpl newSink =
                 new AsyncSinkWriterImplBuilder()
@@ -610,7 +644,8 @@ public class AsyncSinkWriterTest {
 
         newSink.write(String.valueOf(4)); //   Buffer:   12/15B; 3/10 elements; 0 inflight
         newSink.write(String.valueOf(5)); //   Buffer:  16/15B; 4/10 elements; 0 inflight --flushing
-        assertEquals(Arrays.asList(1, 2, 225, 3, 4), res);
+
+        assertThat(res).isEqualTo(Arrays.asList(1, 2, 225, 3, 4));
         // Buffer: [5]; 0 inflight
     }
 
@@ -687,20 +722,20 @@ public class AsyncSinkWriterTest {
         TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
         tpts.setCurrentTime(0L);
         sink.write("1"); // A timer is registered here to elapse at t=100
-        assertEquals(0, res.size());
+        assertThat(res.size()).isEqualTo(0);
         tpts.setCurrentTime(10L);
         sink.flush(true);
-        assertEquals(1, res.size());
+        assertThat(res.size()).isEqualTo(1);
         tpts.setCurrentTime(20L); // At t=20, we write a new element that should not trigger another
         sink.write("2"); // timer to be registered. If it is, it should elapse at t=120s.
-        assertEquals(1, res.size());
+        assertThat(res.size()).isEqualTo(1);
         tpts.setCurrentTime(100L);
-        assertEquals(2, res.size());
+        assertThat(res.size()).isEqualTo(2);
         sink.write("3");
         tpts.setCurrentTime(199L); // At t=199s, our third element has not been written
-        assertEquals(2, res.size()); // therefore, no timer fired at 120s.
+        assertThat(res.size()).isEqualTo(2); // therefore, no timer fired at 120s.
         tpts.setCurrentTime(200L);
-        assertEquals(3, res.size());
+        assertThat(res.size()).isEqualTo(3);
     }
 
     @Test
@@ -721,13 +756,13 @@ public class AsyncSinkWriterTest {
         sink.write("2");
         sink.write("225");
         tpts.setCurrentTime(100L);
-        assertEquals(2, res.size());
+        assertThat(res.size()).isEqualTo(2);
         sink.write("3");
         sink.write("4");
         tpts.setCurrentTime(199L);
-        assertEquals(2, res.size());
+        assertThat(res.size()).isEqualTo(2);
         tpts.setCurrentTime(200L);
-        assertEquals(5, res.size());
+        assertThat(res.size()).isEqualTo(5);
     }
 
     @Test
@@ -748,7 +783,7 @@ public class AsyncSinkWriterTest {
         sink.write("1");
         tpts.setCurrentTime(50L);
         sink.flush(true);
-        assertEquals(1, res.size());
+        assertThat(res.size()).isEqualTo(1);
         tpts.setCurrentTime(200L);
     }
 
@@ -769,10 +804,10 @@ public class AsyncSinkWriterTest {
         tpts.setCurrentTime(0L);
         sink.write("1");
         tpts.setCurrentTime(100L);
-        assertEquals(1, res.size());
+        assertThat(res.size()).isEqualTo(1);
         sink.write("2");
         tpts.setCurrentTime(200L);
-        assertEquals(2, res.size());
+        assertThat(res.size()).isEqualTo(2);
     }
 
     /**
@@ -806,7 +841,7 @@ public class AsyncSinkWriterTest {
                         true);
 
         writeTwoElementsAndInterleaveTheNextTwoElements(sink, blockedWriteLatch, delayedStartLatch);
-        assertEquals(Arrays.asList(1, 2, 3, 4), res);
+        assertThat(res).isEqualTo(Arrays.asList(1, 2, 3, 4));
     }
 
     /**
@@ -837,7 +872,7 @@ public class AsyncSinkWriterTest {
                         false);
 
         writeTwoElementsAndInterleaveTheNextTwoElements(sink, blockedWriteLatch, delayedStartLatch);
-        assertEquals(Arrays.asList(4, 1, 2, 3), res);
+        assertThat(res).isEqualTo(Arrays.asList(4, 1, 2, 3));
     }
 
     private void writeTwoElementsAndInterleaveTheNextTwoElements(
@@ -866,9 +901,9 @@ public class AsyncSinkWriterTest {
         tpts.setCurrentTime(100L);
         blockedWriteLatch.countDown();
         es.shutdown();
-        assertTrue(
-                es.awaitTermination(500, TimeUnit.MILLISECONDS),
-                "Executor Service stuck at termination, not terminated after 500ms!");
+        assertThat(es.awaitTermination(500, TimeUnit.MILLISECONDS))
+                .as("Executor Service stuck at termination, not terminated after 500ms!")
+                .isTrue();
     }
 
     /**
@@ -924,19 +959,19 @@ public class AsyncSinkWriterTest {
                             }
                         });
         Thread.sleep(300);
-        assertFalse(s.isInterrupted());
+        assertThat(s.isInterrupted()).isFalse();
         s.interrupt();
         blockedWriteLatch.countDown();
 
         t.join();
 
-        assertEquals(Arrays.asList(1, 2, 3), res);
+        assertThat(res).isEqualTo(Arrays.asList(1, 2, 3));
     }
 
     private BufferedRequestState<Integer> getWriterState(
             AsyncSinkWriter<String, Integer> sinkWriter) {
         List<BufferedRequestState<Integer>> states = sinkWriter.snapshotState(1);
-        assertEquals(states.size(), 1);
+        assertThat(states.size()).isEqualTo(1);
         return states.get(0);
     }
 
@@ -1205,10 +1240,11 @@ public class AsyncSinkWriterTest {
                 try {
                     delayedStartLatch.countDown();
                     if (blockForLimitedTime) {
-                        assertFalse(
-                                blockedThreadLatch.await(500, TimeUnit.MILLISECONDS),
-                                "The countdown latch was released before the full amount"
-                                        + "of time was reached.");
+                        assertThat(blockedThreadLatch.await(500, TimeUnit.MILLISECONDS))
+                                .as(
+                                        "The countdown latch was released before the full amount"
+                                                + "of time was reached.")
+                                .isFalse();
                     } else {
                         blockedThreadLatch.await();
                     }