You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2022/03/29 19:43:43 UTC
[flink] 01/03: [refactor][streaming] Migrate Source(Operator)StreamTaskTest to JUnit5 and assertj
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7aefdf2c6aefe8c24af30a4f28a59f2780503d21
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Thu Mar 17 22:07:58 2022 +0100
[refactor][streaming] Migrate Source(Operator)StreamTaskTest to JUnit5 and assertj
---
.../tasks/SourceOperatorStreamTaskTest.java | 61 +++++++-------
.../runtime/tasks/SourceStreamTaskTest.java | 93 +++++++++++-----------
.../runtime/tasks/SourceStreamTaskTestBase.java | 23 +++---
3 files changed, 85 insertions(+), 92 deletions(-)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index 3d9087f..0ea212a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -59,7 +59,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.LifeCycleMonitor.LifeCyclePhase;
import org.apache.flink.util.SerializedValue;
-import org.junit.Test;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.Serializable;
@@ -75,29 +76,24 @@ import java.util.function.Supplier;
import java.util.stream.IntStream;
import static org.apache.flink.streaming.util.TestHarnessUtil.assertOutputEquals;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for verifying that the {@link SourceOperator} as a task input can be integrated well with
* {@link org.apache.flink.streaming.runtime.io.StreamOneInputProcessor}.
*/
-public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
+class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
+
private static final OperatorID OPERATOR_ID = new OperatorID();
private static final int NUM_RECORDS = 10;
@Test
- public void testMetrics() throws Exception {
+ void testMetrics() throws Exception {
testMetrics(
SourceOperatorStreamTask::new,
new SourceOperatorFactory<>(
new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks()),
- lessThanOrEqualTo(1_000_000d));
+ busyTime -> busyTime.isLessThanOrEqualTo(1_000_000d));
}
/**
@@ -105,7 +101,7 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
* operators.
*/
@Test
- public void testSnapshotAndRestore() throws Exception {
+ void testSnapshotAndRestore() throws Exception {
// process NUM_RECORDS records and take a snapshot.
TaskStateSnapshot taskStateSnapshot =
executeAndWaitForCheckpoint(1, null, IntStream.range(0, NUM_RECORDS));
@@ -116,7 +112,7 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
}
@Test
- public void testSnapshotAndAdvanceToEndOfEventTime() throws Exception {
+ void testSnapshotAndAdvanceToEndOfEventTime() throws Exception {
final int checkpointId = 1;
try (StreamTaskMailboxTestHarness<Integer> testHarness =
createTestHarness(checkpointId, null)) {
@@ -139,7 +135,7 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
}
@Test
- public void testEmittingMaxWatermarkAfterReadingAllRecords() throws Exception {
+ void testEmittingMaxWatermarkAfterReadingAllRecords() throws Exception {
try (StreamTaskMailboxTestHarness<Integer> testHarness = createTestHarness()) {
testHarness.processAll();
testHarness.finishProcessing();
@@ -147,22 +143,22 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
Queue<Object> expectedOutput = new LinkedList<>();
expectedOutput.add(Watermark.MAX_WATERMARK);
expectedOutput.add(new EndOfData(StopMode.DRAIN));
- assertThat(testHarness.getOutput().toArray(), equalTo(expectedOutput.toArray()));
+ assertThat(testHarness.getOutput().toArray()).isEqualTo(expectedOutput.toArray());
}
}
@Test
- public void testNotEmittingMaxWatermarkAfterCancelling() throws Exception {
+ void testNotEmittingMaxWatermarkAfterCancelling() throws Exception {
try (StreamTaskMailboxTestHarness<Integer> testHarness = createTestHarness()) {
testHarness.getStreamTask().cancel();
testHarness.finishProcessing();
- assertThat(testHarness.getOutput(), hasSize(0));
+ assertThat(testHarness.getOutput()).hasSize(0);
}
}
@Test
- public void testExternallyInducedSource() throws Exception {
+ void testExternallyInducedSource() throws Exception {
final int numEventsBeforeCheckpoint = 10;
final int totalNumEvents = 20;
TestingExternallyInducedSourceReader testingReader =
@@ -176,17 +172,18 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
testHarness.processAll();
- assertEquals(totalNumEvents, runtimeTestingReader.numEmittedEvents);
- assertTrue(runtimeTestingReader.checkpointed);
- assertEquals(
- TestingExternallyInducedSourceReader.CHECKPOINT_ID,
- runtimeTestingReader.checkpointedId);
- assertEquals(numEventsBeforeCheckpoint, runtimeTestingReader.checkpointedAt);
+ assertThat(runtimeTestingReader.numEmittedEvents).isEqualTo(totalNumEvents);
+ assertThat(runtimeTestingReader.checkpointed).isTrue();
+ assertThat(runtimeTestingReader.checkpointedId)
+ .isEqualTo(TestingExternallyInducedSourceReader.CHECKPOINT_ID);
+ assertThat(runtimeTestingReader.checkpointedAt).isEqualTo(numEventsBeforeCheckpoint);
+ Assertions.assertThat(testHarness.getOutput())
+ .contains(new CheckpointBarrier(2, 2, checkpointOptions));
}
}
@Test
- public void testSkipExecutionIfFinishedOnRestore() throws Exception {
+ void testSkipExecutionIfFinishedOnRestore() throws Exception {
TaskStateSnapshot taskStateSnapshot = TaskStateSnapshot.FINISHED_ON_RESTORE;
LifeCycleMonitorSource testingSource =
@@ -215,7 +212,8 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
testHarness.getStreamTask().invoke();
testHarness.processAll();
- assertThat(output, contains(Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)));
+ assertThat(output)
+ .containsExactly(Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN));
LifeCycleMonitorSourceReader sourceReader =
(LifeCycleMonitorSourceReader)
@@ -226,7 +224,7 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
}
@Test
- public void testTriggeringStopWithSavepointWithDrain() throws Exception {
+ void testTriggeringStopWithSavepointWithDrain() throws Exception {
SourceOperatorFactory<Integer> sourceOperatorFactory =
new SourceOperatorFactory<>(
new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 2),
@@ -271,9 +269,9 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
testHarness.waitForTaskCompletion();
testHarness.finishProcessing();
- assertTrue(triggerResult.isDone());
- assertTrue(triggerResult.get());
- assertTrue(checkpointCompleted.isDone());
+ assertThat(triggerResult.isDone()).isTrue();
+ assertThat(triggerResult.get()).isTrue();
+ assertThat(checkpointCompleted.isDone()).isTrue();
}
}
@@ -304,7 +302,8 @@ public class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase {
expectedOutput.add(
new CheckpointBarrier(checkpointId, checkpointId, checkpointOptions));
- assertEquals(checkpointId, testHarness.taskStateManager.getReportedCheckpointId());
+ assertThat(testHarness.taskStateManager.getReportedCheckpointId())
+ .isEqualTo(checkpointId);
assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
return testHarness.taskStateManager.getLastJobManagerTaskStateSnapshot();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index aa4b364..d16c5ab 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -70,8 +70,7 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.CheckedSupplier;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.Serializable;
@@ -94,23 +93,18 @@ import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
import static org.apache.flink.streaming.runtime.tasks.StreamTaskFinalCheckpointsTest.triggerCheckpoint;
import static org.apache.flink.util.Preconditions.checkState;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
/**
* These tests verify that the RichFunction methods are called (in correct order). And that
* checkpointing/element emission don't occur concurrently.
*/
-public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
+class SourceStreamTaskTest extends SourceStreamTaskTestBase {
/** This test verifies that open() and close() are correctly called by the StreamTask. */
@Test
- public void testOpenClose() throws Exception {
+ void testOpenClose() throws Exception {
final StreamTaskTestHarness<String> testHarness =
new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
@@ -124,15 +118,17 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
testHarness.invoke();
testHarness.waitForTaskCompletion();
- assertTrue("RichFunction methods where not called.", OpenCloseTestSource.closeCalled);
+ assertThat(OpenCloseTestSource.closeCalled)
+ .as("RichFunction methods where not called.")
+ .isTrue();
List<String> resultElements =
TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
- Assert.assertEquals(10, resultElements.size());
+ assertThat(resultElements.size()).isEqualTo(10);
}
@Test
- public void testMetrics() throws Exception {
+ void testMetrics() throws Exception {
testMetrics(
SourceStreamTask::new,
SimpleOperatorFactory.of(
@@ -140,7 +136,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
new CancelTestSource(
INT_TYPE_INFO.createSerializer(new ExecutionConfig()),
42))),
- is(Double.NaN));
+ busyTime -> busyTime.isNaN());
}
/**
@@ -157,7 +153,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
*/
@Test
@SuppressWarnings("unchecked")
- public void testCheckpointing() throws Exception {
+ void testCheckpointing() throws Exception {
final int numElements = 100;
final int numCheckpoints = 100;
final int numCheckpointers = 1;
@@ -214,14 +210,14 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
List<Tuple2<Long, Integer>> resultElements =
TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
- Assert.assertEquals(numElements, resultElements.size());
+ assertThat(resultElements.size()).isEqualTo(numElements);
} finally {
executor.shutdown();
}
}
@Test
- public void testClosingAllOperatorsOnChainProperly() throws Exception {
+ void testClosingAllOperatorsOnChainProperly() throws Exception {
final StreamTaskTestHarness<String> testHarness =
new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
@@ -254,11 +250,11 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
new StreamRecord<>("[Operator1]: Finish"));
final Object[] output = testHarness.getOutput().toArray();
- assertArrayEquals("Output was not correct.", expected.toArray(), output);
+ assertThat(output).as("Output was not correct.").isEqualTo(expected.toArray());
}
@Test
- public void testNotMarkingEndOfInputWhenTaskCancelled() throws Exception {
+ void testNotMarkingEndOfInputWhenTaskCancelled() throws Exception {
final StreamTaskTestHarness<String> testHarness =
new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
@@ -299,22 +295,22 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
}
@Test
- public void testCancellationWithSourceBlockedOnLock() throws Exception {
+ void testCancellationWithSourceBlockedOnLock() throws Exception {
testCancellationWithSourceBlockedOnLock(false, false);
}
@Test
- public void testCancellationWithSourceBlockedOnLockWithPendingMail() throws Exception {
+ void testCancellationWithSourceBlockedOnLockWithPendingMail() throws Exception {
testCancellationWithSourceBlockedOnLock(true, false);
}
@Test
- public void testCancellationWithSourceBlockedOnLockAndThrowingOnError() throws Exception {
+ void testCancellationWithSourceBlockedOnLockAndThrowingOnError() throws Exception {
testCancellationWithSourceBlockedOnLock(false, true);
}
@Test
- public void testCancellationWithSourceBlockedOnLockWithPendingMailAndThrowingOnError()
+ void testCancellationWithSourceBlockedOnLockWithPendingMailAndThrowingOnError()
throws Exception {
testCancellationWithSourceBlockedOnLock(true, true);
}
@@ -324,8 +320,8 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
* StreamTask} which, as of the time this test is being written, is not tested anywhere else
* (like {@link StreamTaskTest} or {@link OneInputStreamTaskTest}).
*/
- public void testCancellationWithSourceBlockedOnLock(
- boolean withPendingMail, boolean throwInCancel) throws Exception {
+ void testCancellationWithSourceBlockedOnLock(boolean withPendingMail, boolean throwInCancel)
+ throws Exception {
final StreamTaskTestHarness<String> testHarness =
new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
@@ -354,9 +350,9 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
.createExecutor(0)
.execute(
() ->
- assertFalse(
- "This should never execute before task cancelation",
- testHarness.getTask().isRunning()),
+ assertThat(testHarness.getTask().isRunning())
+ .as("This should never execute before task cancelation")
+ .isFalse(),
"Test");
}
@@ -427,12 +423,12 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
}
@Test
- public void testInterruptionExceptionNotSwallowed() throws Exception {
+ void testInterruptionExceptionNotSwallowed() throws Exception {
testInterruptionExceptionNotSwallowed(InterruptedException::new);
}
@Test
- public void testWrappedInterruptionExceptionNotSwallowed() throws Exception {
+ void testWrappedInterruptionExceptionNotSwallowed() throws Exception {
testInterruptionExceptionNotSwallowed(
() -> new RuntimeException(new FlinkRuntimeException(new InterruptedException())));
}
@@ -491,7 +487,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
}
@Test
- public void testWaitsForSourceThreadOnCancel() throws Exception {
+ void testWaitsForSourceThreadOnCancel() throws Exception {
StreamTaskTestHarness<String> harness =
new StreamTaskTestHarness<>(SourceStreamTask::new, STRING_TYPE_INFO);
@@ -504,13 +500,13 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
// SourceStreamTask should be still waiting for NonStoppingSource after cancellation
harness.getTask().cancel();
harness.waitForTaskCompletion(50, true); // allow task to exit prematurely
- assertTrue(harness.taskThread.isAlive());
+ assertThat(harness.taskThread.isAlive()).isTrue();
// SourceStreamTask should be still waiting for NonStoppingSource after interruptions
for (int i = 0; i < 10; i++) {
harness.getTask().maybeInterruptOnCancel(harness.getTaskThread(), null, null);
harness.waitForTaskCompletion(50, true); // allow task to exit prematurely
- assertTrue(harness.taskThread.isAlive());
+ assertThat(harness.taskThread.isAlive()).isTrue();
}
// It should only exit once NonStoppingSource allows for it
@@ -519,7 +515,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
}
@Test
- public void testTriggeringCheckpointAfterSourceThreadFinished() throws Exception {
+ void testTriggeringCheckpointAfterSourceThreadFinished() throws Exception {
ResultPartition[] partitionWriters = new ResultPartition[2];
try (NettyShuffleEnvironment env =
new NettyShuffleEnvironmentBuilder()
@@ -585,12 +581,12 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
(id, error) ->
testHarness.getStreamTask().notifyCheckpointCompleteAsync(2));
testHarness.finishProcessing();
- assertTrue(checkpointFuture.isDone());
+ assertThat(checkpointFuture.isDone()).isTrue();
// Each result partition should have emitted 1 barrier, 1 max watermark and 1
// EndOfUserRecordEvent.
for (ResultPartition resultPartition : partitionWriters) {
- assertEquals(3, resultPartition.getNumberOfQueuedBuffers());
+ assertThat(resultPartition.getNumberOfQueuedBuffers()).isEqualTo(3);
}
}
} finally {
@@ -603,7 +599,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
}
@Test
- public void testClosedOnRestoreSourceSkipExecution() throws Exception {
+ void testClosedOnRestoreSourceSkipExecution() throws Exception {
LifeCycleMonitorSource testSource = new LifeCycleMonitorSource();
List<Object> output = new ArrayList<>();
try (StreamTaskMailboxTestHarness<String> harness =
@@ -626,7 +622,8 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
harness.processAll();
harness.streamTask.getCompletionFuture().get();
- assertThat(output, contains(Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)));
+ assertThat(output)
+ .containsExactly(Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN));
LifeCycleMonitorSource source =
(LifeCycleMonitorSource)
@@ -642,7 +639,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
}
@Test
- public void testTriggeringStopWithSavepointWithDrain() throws Exception {
+ void testTriggeringStopWithSavepointWithDrain() throws Exception {
SourceFunction<String> testSource = new EmptySource();
CompletableFuture<Boolean> checkpointCompleted = new CompletableFuture<>();
@@ -684,9 +681,9 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
harness.streamTask.runMailboxLoop();
harness.finishProcessing();
- assertTrue(triggerResult.isDone());
- assertTrue(triggerResult.get());
- assertTrue(checkpointCompleted.isDone());
+ assertThat(triggerResult.isDone()).isTrue();
+ assertThat(triggerResult.get()).isTrue();
+ assertThat(checkpointCompleted.isDone()).isTrue();
}
}
@@ -767,7 +764,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
public List<Serializable> snapshotState(long checkpointId, long timestamp)
throws Exception {
if (!semaphore.tryAcquire()) {
- Assert.fail("Concurrent invocation of snapshotState.");
+ fail("Concurrent invocation of snapshotState.");
}
int startCount = count;
lastCheckpointId = checkpointId;
@@ -780,7 +777,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
if (startCount != count) {
semaphore.release();
// This means that next() was invoked while the snapshot was ongoing
- Assert.fail("Count is different at start end end of snapshot.");
+ fail("Count is different at start end end of snapshot.");
}
semaphore.release();
return Collections.singletonList(sum);
@@ -871,7 +868,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
if (closeCalled) {
- Assert.fail("Close called before open.");
+ fail("Close called before open.");
}
openCalled = true;
}
@@ -880,7 +877,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
public void close() throws Exception {
super.close();
if (!openCalled) {
- Assert.fail("Open was not called before close.");
+ fail("Open was not called before close.");
}
closeCalled = true;
}
@@ -888,7 +885,7 @@ public class SourceStreamTaskTest extends SourceStreamTaskTestBase {
@Override
public void run(SourceContext<String> ctx) throws Exception {
if (!openCalled) {
- Assert.fail("Open was not called before run.");
+ fail("Open was not called before run.");
}
for (int i = 0; i < 10; i++) {
ctx.collect("Hello" + i);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTestBase.java
index 3065641..b477796 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTestBase.java
@@ -32,16 +32,15 @@ import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
-import org.hamcrest.Matcher;
+import org.assertj.core.api.AbstractDoubleAssert;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
+import java.util.function.Consumer;
import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
/** Common base class for testing source tasks. */
public class SourceStreamTaskTestBase {
@@ -49,7 +48,7 @@ public class SourceStreamTaskTestBase {
FunctionWithException<Environment, ? extends StreamTask<Integer, ?>, Exception>
taskFactory,
StreamOperatorFactory<?> operatorFactory,
- Matcher<Double> busyTimeMatcher)
+ Consumer<AbstractDoubleAssert<?>> busyTimeMatcher)
throws Exception {
long sleepTime = 42;
@@ -73,26 +72,24 @@ public class SourceStreamTaskTestBase {
OneShotLatch checkpointAcknowledgeLatch = new OneShotLatch();
harness.getCheckpointResponder().setAcknowledgeLatch(checkpointAcknowledgeLatch);
- assertFalse(triggerFuture.isDone());
+ assertThat(triggerFuture).isNotDone();
Thread.sleep(sleepTime);
while (!triggerFuture.isDone()) {
harness.streamTask.runMailboxStep();
}
Gauge<Long> checkpointStartDelayGauge =
(Gauge<Long>) metrics.get(MetricNames.CHECKPOINT_START_DELAY_TIME);
- assertThat(
- checkpointStartDelayGauge.getValue(),
- greaterThanOrEqualTo(sleepTime * 1_000_000));
+ assertThat(checkpointStartDelayGauge.getValue())
+ .isGreaterThanOrEqualTo(sleepTime * 1_000_000);
Gauge<Double> busyTimeGauge = (Gauge<Double>) metrics.get(MetricNames.TASK_BUSY_TIME);
- assertThat(busyTimeGauge.getValue(), busyTimeMatcher);
+ busyTimeMatcher.accept(assertThat(busyTimeGauge.getValue()));
checkpointAcknowledgeLatch.await();
TestCheckpointResponder.AcknowledgeReport acknowledgeReport =
Iterables.getOnlyElement(
harness.getCheckpointResponder().getAcknowledgeReports());
- assertThat(
- acknowledgeReport.getCheckpointMetrics().getCheckpointStartDelayNanos(),
- greaterThanOrEqualTo(sleepTime * 1_000_000));
+ assertThat(acknowledgeReport.getCheckpointMetrics().getCheckpointStartDelayNanos())
+ .isGreaterThanOrEqualTo(sleepTime * 1_000_000);
}
}
}