You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2022/03/29 19:43:43 UTC

[flink] 01/03: [refactor][streaming] Migrate Source(Operator)StreamTaskTest to JUnit5 and assertj

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7aefdf2c6aefe8c24af30a4f28a59f2780503d21
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Thu Mar 17 22:07:58 2022 +0100

    [refactor][streaming] Migrate Source(Operator)StreamTaskTest to JUnit5 and assertj
---
 .../tasks/SourceOperatorStreamTaskTest.java        | 61 +++++++-------
 .../runtime/tasks/SourceStreamTaskTest.java        | 93 +++++++++++-----------
 .../runtime/tasks/SourceStreamTaskTestBase.java    | 23 +++---
 3 files changed, 85 insertions(+), 92 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index 3d9087f..0ea212a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -59,7 +59,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.LifeCycleMonitor.LifeCyclePhase;
 import org.apache.flink.util.SerializedValue;
 
-import org.junit.Test;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -75,29 +76,24 @@ import java.util.function.Supplier;
 import java.util.stream.IntStream;
 
 import static org.apache.flink.streaming.util.TestHarnessUtil.assertOutputEquals;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Tests for verifying that the {@link SourceOperator} as a task input can be integrated well with
  * {@link org.apache.flink.streaming.runtime.io.StreamOneInputProcessor}.
  */
-public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
+class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
+
     private static final OperatorID OPERATOR_ID = new OperatorID();
     private static final int NUM_RECORDS = 10;
 
     @Test
-    public void testMetrics() throws Exception {
+    void testMetrics() throws Exception {
         testMetrics(
                 SourceOperatorStreamTask::new,
                 new SourceOperatorFactory<>(
                         new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks()),
-                lessThanOrEqualTo(1_000_000d));
+                busyTime -> busyTime.isLessThanOrEqualTo(1_000_000d));
     }
 
     /**
@@ -105,7 +101,7 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
      * operators.
      */
     @Test
-    public void testSnapshotAndRestore() throws Exception {
+    void testSnapshotAndRestore() throws Exception {
         // process NUM_RECORDS records and take a snapshot.
         TaskStateSnapshot taskStateSnapshot =
                 executeAndWaitForCheckpoint(1, null, IntStream.range(0, NUM_RECORDS));
@@ -116,7 +112,7 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
     }
 
     @Test
-    public void testSnapshotAndAdvanceToEndOfEventTime() throws Exception {
+    void testSnapshotAndAdvanceToEndOfEventTime() throws Exception {
         final int checkpointId = 1;
         try (StreamTaskMailboxTestHarness<Integer> testHarness =
                 createTestHarness(checkpointId, null)) {
@@ -139,7 +135,7 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
     }
 
     @Test
-    public void testEmittingMaxWatermarkAfterReadingAllRecords() throws Exception {
+    void testEmittingMaxWatermarkAfterReadingAllRecords() throws Exception {
         try (StreamTaskMailboxTestHarness<Integer> testHarness = createTestHarness()) {
             testHarness.processAll();
             testHarness.finishProcessing();
@@ -147,22 +143,22 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
             Queue<Object> expectedOutput = new LinkedList<>();
             expectedOutput.add(Watermark.MAX_WATERMARK);
             expectedOutput.add(new EndOfData(StopMode.DRAIN));
-            assertThat(testHarness.getOutput().toArray(), equalTo(expectedOutput.toArray()));
+            assertThat(testHarness.getOutput().toArray()).isEqualTo(expectedOutput.toArray());
         }
     }
 
     @Test
-    public void testNotEmittingMaxWatermarkAfterCancelling() throws Exception {
+    void testNotEmittingMaxWatermarkAfterCancelling() throws Exception {
         try (StreamTaskMailboxTestHarness<Integer> testHarness = createTestHarness()) {
             testHarness.getStreamTask().cancel();
             testHarness.finishProcessing();
 
-            assertThat(testHarness.getOutput(), hasSize(0));
+            assertThat(testHarness.getOutput()).hasSize(0);
         }
     }
 
     @Test
-    public void testExternallyInducedSource() throws Exception {
+    void testExternallyInducedSource() throws Exception {
         final int numEventsBeforeCheckpoint = 10;
         final int totalNumEvents = 20;
         TestingExternallyInducedSourceReader testingReader =
@@ -176,17 +172,18 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
 
             testHarness.processAll();
 
-            assertEquals(totalNumEvents, runtimeTestingReader.numEmittedEvents);
-            assertTrue(runtimeTestingReader.checkpointed);
-            assertEquals(
-                    TestingExternallyInducedSourceReader.CHECKPOINT_ID,
-                    runtimeTestingReader.checkpointedId);
-            assertEquals(numEventsBeforeCheckpoint, runtimeTestingReader.checkpointedAt);
+            assertThat(runtimeTestingReader.numEmittedEvents).isEqualTo(totalNumEvents);
+            assertThat(runtimeTestingReader.checkpointed).isTrue();
+            assertThat(runtimeTestingReader.checkpointedId)
+                    .isEqualTo(TestingExternallyInducedSourceReader.CHECKPOINT_ID);
+            assertThat(runtimeTestingReader.checkpointedAt).isEqualTo(numEventsBeforeCheckpoint);
+            Assertions.assertThat(testHarness.getOutput())
+                    .contains(new CheckpointBarrier(2, 2, checkpointOptions));
         }
     }
 
     @Test
-    public void testSkipExecutionIfFinishedOnRestore() throws Exception {
+    void testSkipExecutionIfFinishedOnRestore() throws Exception {
         TaskStateSnapshot taskStateSnapshot = TaskStateSnapshot.FINISHED_ON_RESTORE;
 
         LifeCycleMonitorSource testingSource =
@@ -215,7 +212,8 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
 
             testHarness.getStreamTask().invoke();
             testHarness.processAll();
-            assertThat(output, contains(Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)));
+            assertThat(output)
+                    .containsExactly(Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN));
 
             LifeCycleMonitorSourceReader sourceReader =
                     (LifeCycleMonitorSourceReader)
@@ -226,7 +224,7 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
     }
 
     @Test
-    public void testTriggeringStopWithSavepointWithDrain() throws Exception {
+    void testTriggeringStopWithSavepointWithDrain() throws Exception {
         SourceOperatorFactory<Integer> sourceOperatorFactory =
                 new SourceOperatorFactory<>(
                         new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 2),
@@ -271,9 +269,9 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
             testHarness.waitForTaskCompletion();
             testHarness.finishProcessing();
 
-            assertTrue(triggerResult.isDone());
-            assertTrue(triggerResult.get());
-            assertTrue(checkpointCompleted.isDone());
+            assertThat(triggerResult.isDone()).isTrue();
+            assertThat(triggerResult.get()).isTrue();
+            assertThat(checkpointCompleted.isDone()).isTrue();
         }
     }
 
@@ -304,7 +302,8 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
             expectedOutput.add(
                     new CheckpointBarrier(checkpointId, checkpointId, checkpointOptions));
 
-            assertEquals(checkpointId, testHarness.taskStateManager.getReportedCheckpointId());
+            assertThat(testHarness.taskStateManager.getReportedCheckpointId())
+                    .isEqualTo(checkpointId);
             assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 
             return testHarness.taskStateManager.getLastJobManagerTaskStateSnapshot();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index aa4b364..d16c5ab 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -70,8 +70,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.function.CheckedSupplier;
 
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -94,23 +93,18 @@ import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
 import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
 import static org.apache.flink.streaming.runtime.tasks.StreamTaskFinalCheckpointsTest.triggerCheckpoint;
 import static org.apache.flink.util.Preconditions.checkState;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /**
  * These tests verify that the RichFunction methods are called (in correct order). And that
  * checkpointing/element emission don't occur concurrently.
  */
-public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
+class SourceStreamTaskTest extends SourceStreamTaskTestBase {
 
     /** This test verifies that open() and close() are correctly called by the StreamTask. */
     @Test
-    public void testOpenClose() throws Exception {
+    void testOpenClose() throws Exception {
         final StreamTaskTestHarness<String> testHarness =
                 new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
@@ -124,15 +118,17 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
         testHarness.invoke();
         testHarness.waitForTaskCompletion();
 
-        assertTrue("RichFunction methods where not called.", OpenCloseTestSource.closeCalled);
+        assertThat(OpenCloseTestSource.closeCalled)
+                .as("RichFunction methods where not called.")
+                .isTrue();
 
         List<String> resultElements =
                 TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
-        Assert.assertEquals(10, resultElements.size());
+        assertThat(resultElements.size()).isEqualTo(10);
     }
 
     @Test
-    public void testMetrics() throws Exception {
+    void testMetrics() throws Exception {
         testMetrics(
                 SourceStreamTask::new,
                 SimpleOperatorFactory.of(
@@ -140,7 +136,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
                                 new CancelTestSource(
                                         INT_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                         42))),
-                is(Double.NaN));
+                busyTime -> busyTime.isNaN());
     }
 
     /**
@@ -157,7 +153,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
      */
     @Test
     @SuppressWarnings("unchecked")
-    public void testCheckpointing() throws Exception {
+    void testCheckpointing() throws Exception {
         final int numElements = 100;
         final int numCheckpoints = 100;
         final int numCheckpointers = 1;
@@ -214,14 +210,14 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
 
             List<Tuple2<Long, Integer>> resultElements =
                     TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
-            Assert.assertEquals(numElements, resultElements.size());
+            assertThat(resultElements.size()).isEqualTo(numElements);
         } finally {
             executor.shutdown();
         }
     }
 
     @Test
-    public void testClosingAllOperatorsOnChainProperly() throws Exception {
+    void testClosingAllOperatorsOnChainProperly() throws Exception {
         final StreamTaskTestHarness<String> testHarness =
                 new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
@@ -254,11 +250,11 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
                 new StreamRecord<>("[Operator1]: Finish"));
 
         final Object[] output = testHarness.getOutput().toArray();
-        assertArrayEquals("Output was not correct.", expected.toArray(), output);
+        assertThat(output).as("Output was not correct.").isEqualTo(expected.toArray());
     }
 
     @Test
-    public void testNotMarkingEndOfInputWhenTaskCancelled() throws Exception {
+    void testNotMarkingEndOfInputWhenTaskCancelled() throws Exception {
         final StreamTaskTestHarness<String> testHarness =
                 new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
@@ -299,22 +295,22 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
     }
 
     @Test
-    public void testCancellationWithSourceBlockedOnLock() throws Exception {
+    void testCancellationWithSourceBlockedOnLock() throws Exception {
         testCancellationWithSourceBlockedOnLock(false, false);
     }
 
     @Test
-    public void testCancellationWithSourceBlockedOnLockWithPendingMail() throws Exception {
+    void testCancellationWithSourceBlockedOnLockWithPendingMail() throws Exception {
         testCancellationWithSourceBlockedOnLock(true, false);
     }
 
     @Test
-    public void testCancellationWithSourceBlockedOnLockAndThrowingOnError() throws Exception {
+    void testCancellationWithSourceBlockedOnLockAndThrowingOnError() throws Exception {
         testCancellationWithSourceBlockedOnLock(false, true);
     }
 
     @Test
-    public void testCancellationWithSourceBlockedOnLockWithPendingMailAndThrowingOnError()
+    void testCancellationWithSourceBlockedOnLockWithPendingMailAndThrowingOnError()
             throws Exception {
         testCancellationWithSourceBlockedOnLock(true, true);
     }
@@ -324,8 +320,8 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
      * StreamTask} which, as of the time this test is being written, is not tested anywhere else
      * (like {@link StreamTaskTest} or {@link OneInputStreamTaskTest}).
      */
-    public void testCancellationWithSourceBlockedOnLock(
-            boolean withPendingMail, boolean throwInCancel) throws Exception {
+    void testCancellationWithSourceBlockedOnLock(boolean withPendingMail, boolean throwInCancel)
+            throws Exception {
         final StreamTaskTestHarness<String> testHarness =
                 new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
@@ -354,9 +350,9 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
                     .createExecutor(0)
                     .execute(
                             () ->
-                                    assertFalse(
-                                            "This should never execute before task cancelation",
-                                            testHarness.getTask().isRunning()),
+                                    assertThat(testHarness.getTask().isRunning())
+                                            .as("This should never execute before task cancelation")
+                                            .isFalse(),
                             "Test");
         }
 
@@ -427,12 +423,12 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
     }
 
     @Test
-    public void testInterruptionExceptionNotSwallowed() throws Exception {
+    void testInterruptionExceptionNotSwallowed() throws Exception {
         testInterruptionExceptionNotSwallowed(InterruptedException::new);
     }
 
     @Test
-    public void testWrappedInterruptionExceptionNotSwallowed() throws Exception {
+    void testWrappedInterruptionExceptionNotSwallowed() throws Exception {
         testInterruptionExceptionNotSwallowed(
                 () -> new RuntimeException(new FlinkRuntimeException(new InterruptedException())));
     }
@@ -491,7 +487,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
     }
 
     @Test
-    public void testWaitsForSourceThreadOnCancel() throws Exception {
+    void testWaitsForSourceThreadOnCancel() throws Exception {
         StreamTaskTestHarness<String> harness =
                 new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
 
@@ -504,13 +500,13 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
         // SourceStreamTask should be still waiting for NonStoppingSource after cancellation
         harness.getTask().cancel();
         harness.waitForTaskCompletion(50, true); // allow task to exit prematurely
-        assertTrue(harness.taskThread.isAlive());
+        assertThat(harness.taskThread.isAlive()).isTrue();
 
         // SourceStreamTask should be still waiting for NonStoppingSource after interruptions
         for (int i = 0; i < 10; i++) {
             harness.getTask().maybeInterruptOnCancel(harness.getTaskThread(), null, null);
             harness.waitForTaskCompletion(50, true); // allow task to exit prematurely
-            assertTrue(harness.taskThread.isAlive());
+            assertThat(harness.taskThread.isAlive()).isTrue();
         }
 
         // It should only exit once NonStoppingSource allows for it
@@ -519,7 +515,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
     }
 
     @Test
-    public void testTriggeringCheckpointAfterSourceThreadFinished() throws Exception {
+    void testTriggeringCheckpointAfterSourceThreadFinished() throws Exception {
         ResultPartition[] partitionWriters = new ResultPartition[2];
         try (NettyShuffleEnvironment env =
                 new NettyShuffleEnvironmentBuilder()
@@ -585,12 +581,12 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
                         (id, error) ->
                                 testHarness.getStreamTask().notifyCheckpointCompleteAsync(2));
                 testHarness.finishProcessing();
-                assertTrue(checkpointFuture.isDone());
+                assertThat(checkpointFuture.isDone()).isTrue();
 
                 // Each result partition should have emitted 1 barrier, 1 max watermark and 1
                 // EndOfUserRecordEvent.
                 for (ResultPartition resultPartition : partitionWriters) {
-                    assertEquals(3, resultPartition.getNumberOfQueuedBuffers());
+                    assertThat(resultPartition.getNumberOfQueuedBuffers()).isEqualTo(3);
                 }
             }
         } finally {
@@ -603,7 +599,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
     }
 
     @Test
-    public void testClosedOnRestoreSourceSkipExecution() throws Exception {
+    void testClosedOnRestoreSourceSkipExecution() throws Exception {
         LifeCycleMonitorSource testSource = new LifeCycleMonitorSource();
         List<Object> output = new ArrayList<>();
         try (StreamTaskMailboxTestHarness<String> harness =
@@ -626,7 +622,8 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
             harness.processAll();
             harness.streamTask.getCompletionFuture().get();
 
-            assertThat(output, contains(Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)));
+            assertThat(output)
+                    .containsExactly(Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN));
 
             LifeCycleMonitorSource source =
                     (LifeCycleMonitorSource)
@@ -642,7 +639,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
     }
 
     @Test
-    public void testTriggeringStopWithSavepointWithDrain() throws Exception {
+    void testTriggeringStopWithSavepointWithDrain() throws Exception {
         SourceFunction<String> testSource = new EmptySource();
 
         CompletableFuture<Boolean> checkpointCompleted = new CompletableFuture<>();
@@ -684,9 +681,9 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
             harness.streamTask.runMailboxLoop();
             harness.finishProcessing();
 
-            assertTrue(triggerResult.isDone());
-            assertTrue(triggerResult.get());
-            assertTrue(checkpointCompleted.isDone());
+            assertThat(triggerResult.isDone()).isTrue();
+            assertThat(triggerResult.get()).isTrue();
+            assertThat(checkpointCompleted.isDone()).isTrue();
         }
     }
 
@@ -767,7 +764,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
         public List<Serializable> snapshotState(long checkpointId, long timestamp)
                 throws Exception {
             if (!semaphore.tryAcquire()) {
-                Assert.fail("Concurrent invocation of snapshotState.");
+                fail("Concurrent invocation of snapshotState.");
             }
             int startCount = count;
             lastCheckpointId = checkpointId;
@@ -780,7 +777,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
             if (startCount != count) {
                 semaphore.release();
                 // This means that next() was invoked while the snapshot was ongoing
-                Assert.fail("Count is different at start end end of snapshot.");
+                fail("Count is different at start end end of snapshot.");
             }
             semaphore.release();
             return Collections.singletonList(sum);
@@ -871,7 +868,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
         public void open(Configuration parameters) throws Exception {
             super.open(parameters);
             if (closeCalled) {
-                Assert.fail("Close called before open.");
+                fail("Close called before open.");
             }
             openCalled = true;
         }
@@ -880,7 +877,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
         public void close() throws Exception {
             super.close();
             if (!openCalled) {
-                Assert.fail("Open was not called before close.");
+                fail("Open was not called before close.");
             }
             closeCalled = true;
         }
@@ -888,7 +885,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
         @Override
         public void run(SourceContext<String> ctx) throws Exception {
             if (!openCalled) {
-                Assert.fail("Open was not called before run.");
+                fail("Open was not called before run.");
             }
             for (int i = 0; i < 10; i++) {
                 ctx.collect("Hello" + i);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTestBase.java
index 3065641..b477796 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTestBase.java
@@ -32,16 +32,15 @@ import org.apache.flink.util.function.FunctionWithException;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 
-import org.hamcrest.Matcher;
+import org.assertj.core.api.AbstractDoubleAssert;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
+import java.util.function.Consumer;
 
 import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Common base class for testing source tasks. */
 public class SourceStreamTaskTestBase {
@@ -49,7 +48,7 @@ public class SourceStreamTaskTestBase {
             FunctionWithException<Environment, ? extends StreamTask<Integer, ?>, Exception>
                     taskFactory,
             StreamOperatorFactory<?> operatorFactory,
-            Matcher<Double> busyTimeMatcher)
+            Consumer<AbstractDoubleAssert<?>> busyTimeMatcher)
             throws Exception {
         long sleepTime = 42;
 
@@ -73,26 +72,24 @@ public class SourceStreamTaskTestBase {
             OneShotLatch checkpointAcknowledgeLatch = new OneShotLatch();
             harness.getCheckpointResponder().setAcknowledgeLatch(checkpointAcknowledgeLatch);
 
-            assertFalse(triggerFuture.isDone());
+            assertThat(triggerFuture).isNotDone();
             Thread.sleep(sleepTime);
             while (!triggerFuture.isDone()) {
                 harness.streamTask.runMailboxStep();
             }
             Gauge<Long> checkpointStartDelayGauge =
                     (Gauge<Long>) metrics.get(MetricNames.CHECKPOINT_START_DELAY_TIME);
-            assertThat(
-                    checkpointStartDelayGauge.getValue(),
-                    greaterThanOrEqualTo(sleepTime * 1_000_000));
+            assertThat(checkpointStartDelayGauge.getValue())
+                    .isGreaterThanOrEqualTo(sleepTime * 1_000_000);
             Gauge<Double> busyTimeGauge = (Gauge<Double>) metrics.get(MetricNames.TASK_BUSY_TIME);
-            assertThat(busyTimeGauge.getValue(), busyTimeMatcher);
+            busyTimeMatcher.accept(assertThat(busyTimeGauge.getValue()));
 
             checkpointAcknowledgeLatch.await();
             TestCheckpointResponder.AcknowledgeReport acknowledgeReport =
                     Iterables.getOnlyElement(
                             harness.getCheckpointResponder().getAcknowledgeReports());
-            assertThat(
-                    acknowledgeReport.getCheckpointMetrics().getCheckpointStartDelayNanos(),
-                    greaterThanOrEqualTo(sleepTime * 1_000_000));
+            assertThat(acknowledgeReport.getCheckpointMetrics().getCheckpointStartDelayNanos())
+                    .isGreaterThanOrEqualTo(sleepTime * 1_000_000);
         }
     }
 }