You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/28 09:23:24 UTC

[flink] 01/02: [hotfix] Migrate StreamingJobGraphGeneratorTest all tests to assertj and make testing method package private

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 32b95e7715c2eda0ff9523a593c798ba3b47c397
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Sep 28 11:47:41 2022 +0800

    [hotfix] Migrate StreamingJobGraphGeneratorTest all tests to assertj and make testing method package private
---
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 449 +++++++++------------
 1 file changed, 200 insertions(+), 249 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 7da1c0e3697..dc534008158 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -100,15 +100,14 @@ import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 
 import org.assertj.core.api.Assertions;
 import org.assertj.core.data.Offset;
-import org.hamcrest.FeatureMatcher;
-import org.hamcrest.Matcher;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -126,17 +125,16 @@ import java.util.stream.Collectors;
 import static org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE;
 import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.areOperatorsChainable;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
 
 /** Tests for {@link StreamingJobGraphGenerator}. */
+@ExtendWith(TestLoggerExtension.class)
 @SuppressWarnings("serial")
-public class StreamingJobGraphGeneratorTest extends TestLogger {
+class StreamingJobGraphGeneratorTest {
 
     @Test
-    public void testParallelismOneNotChained() {
+    void testParallelismOneNotChained() {
 
         // --------- the program ---------
 
@@ -179,16 +177,16 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobGraph jobGraph = streamGraph.getJobGraph();
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
 
-        Assertions.assertThat(jobGraph.getNumberOfVertices()).isEqualTo(2);
-        Assertions.assertThat(verticesSorted.get(0).getParallelism()).isEqualTo(1);
-        Assertions.assertThat(verticesSorted.get(1).getParallelism()).isEqualTo(1);
+        assertThat(jobGraph.getNumberOfVertices()).isEqualTo(2);
+        assertThat(verticesSorted.get(0).getParallelism()).isEqualTo(1);
+        assertThat(verticesSorted.get(1).getParallelism()).isEqualTo(1);
 
         JobVertex sourceVertex = verticesSorted.get(0);
         JobVertex mapSinkVertex = verticesSorted.get(1);
 
-        Assertions.assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
+        assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
                 .isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
-        Assertions.assertThat(mapSinkVertex.getInputs().get(0).getSource().getResultType())
+        assertThat(mapSinkVertex.getInputs().get(0).getSource().getResultType())
                 .isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
     }
 
@@ -197,40 +195,36 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
      * checkpoint mode to {@link CheckpointingMode#AT_LEAST_ONCE}.
      */
     @Test
-    public void testDisabledCheckpointing() throws Exception {
+    void testDisabledCheckpointing() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.fromElements(0).print();
         StreamGraph streamGraph = env.getStreamGraph();
-        Assertions.assertThat(streamGraph.getCheckpointConfig().isCheckpointingEnabled())
+        assertThat(streamGraph.getCheckpointConfig().isCheckpointingEnabled())
                 .withFailMessage("Checkpointing enabled")
                 .isFalse();
 
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
 
         JobCheckpointingSettings snapshottingSettings = jobGraph.getCheckpointingSettings();
-        Assertions.assertThat(
+        assertThat(
                         snapshottingSettings
                                 .getCheckpointCoordinatorConfiguration()
                                 .getCheckpointInterval())
                 .isEqualTo(Long.MAX_VALUE);
-        Assertions.assertThat(
-                        snapshottingSettings
-                                .getCheckpointCoordinatorConfiguration()
-                                .isExactlyOnce())
+        assertThat(snapshottingSettings.getCheckpointCoordinatorConfiguration().isExactlyOnce())
                 .isFalse();
 
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
         StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration());
-        Assertions.assertThat(streamConfig.getCheckpointMode())
-                .isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
+        assertThat(streamConfig.getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
     }
 
     @Test
-    public void testEnabledUnalignedCheckAndDisabledCheckpointing() {
+    void testEnabledUnalignedCheckAndDisabledCheckpointing() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.fromElements(0).print();
         StreamGraph streamGraph = env.getStreamGraph();
-        Assertions.assertThat(streamGraph.getCheckpointConfig().isCheckpointingEnabled())
+        assertThat(streamGraph.getCheckpointConfig().isCheckpointingEnabled())
                 .withFailMessage("Checkpointing enabled")
                 .isFalse();
         env.getCheckpointConfig().enableUnalignedCheckpoints(true);
@@ -239,13 +233,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
         StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration());
-        Assertions.assertThat(streamConfig.getCheckpointMode())
-                .isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
-        Assertions.assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
+        assertThat(streamConfig.getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
+        assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
     }
 
     @Test
-    public void testUnalignedCheckAndAtLeastOnce() {
+    void testUnalignedCheckAndAtLeastOnce() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.fromElements(0).print();
         StreamGraph streamGraph = env.getStreamGraph();
@@ -256,13 +249,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
         StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration());
-        Assertions.assertThat(streamConfig.getCheckpointMode())
-                .isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
-        Assertions.assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
+        assertThat(streamConfig.getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
+        assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
     }
 
     @Test
-    public void generatorForwardsSavepointRestoreSettings() {
+    void generatorForwardsSavepointRestoreSettings() {
         StreamGraph streamGraph =
                 new StreamGraph(
                         new ExecutionConfig(),
@@ -272,12 +264,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
 
         SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
-        assertThat(savepointRestoreSettings.getRestorePath(), is("hello"));
+        assertThat(savepointRestoreSettings.getRestorePath()).isEqualTo("hello");
     }
 
     /** Verifies that the chain start/end is correctly set. */
     @Test
-    public void testChainStartEndSetting() throws Exception {
+    void testChainStartEndSetting() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
         // set parallelism to 2 to avoid chaining with source in case when available processors is
@@ -300,9 +292,9 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobVertex sourceVertex = verticesSorted.get(0);
         JobVertex mapPrintVertex = verticesSorted.get(1);
 
-        Assertions.assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
+        assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
                 .isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
-        Assertions.assertThat(mapPrintVertex.getInputs().get(0).getSource().getResultType())
+        assertThat(mapPrintVertex.getInputs().get(0).getSource().getResultType())
                 .isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
 
         StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration());
@@ -311,18 +303,18 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                 mapConfig.getTransitiveChainedTaskConfigs(getClass().getClassLoader());
         StreamConfig printConfig = chainedConfigs.values().iterator().next();
 
-        Assertions.assertThat(sourceConfig.isChainStart()).isTrue();
-        Assertions.assertThat(sourceConfig.isChainEnd()).isTrue();
+        assertThat(sourceConfig.isChainStart()).isTrue();
+        assertThat(sourceConfig.isChainEnd()).isTrue();
 
-        Assertions.assertThat(mapConfig.isChainStart()).isTrue();
-        Assertions.assertThat(mapConfig.isChainEnd()).isFalse();
+        assertThat(mapConfig.isChainStart()).isTrue();
+        assertThat(mapConfig.isChainEnd()).isFalse();
 
-        Assertions.assertThat(printConfig.isChainStart()).isFalse();
-        Assertions.assertThat(printConfig.isChainEnd()).isTrue();
+        assertThat(printConfig.isChainStart()).isFalse();
+        assertThat(printConfig.isChainEnd()).isTrue();
     }
 
     @Test
-    public void testOperatorCoordinatorAddedToJobVertex() {
+    void testOperatorCoordinatorAddedToJobVertex() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         DataStream<Integer> stream =
                 env.fromSource(
@@ -342,8 +334,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
 
-        Assertions.assertThat(jobGraph.getVerticesAsArray()[0].getOperatorCoordinators().size())
-                .isEqualTo(2);
+        assertThat(jobGraph.getVerticesAsArray()[0].getOperatorCoordinators()).hasSize(2);
     }
 
     /**
@@ -351,7 +342,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
      * sink cases) when generating job graph.
      */
     @Test
-    public void testResourcesForChainedSourceSink() throws Exception {
+    void testResourcesForChainedSourceSink() throws Exception {
         ResourceSpec resource1 = ResourceSpec.newBuilder(0.1, 100).build();
         ResourceSpec resource2 = ResourceSpec.newBuilder(0.2, 200).build();
         ResourceSpec resource3 = ResourceSpec.newBuilder(0.3, 300).build();
@@ -425,14 +416,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                 jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
         JobVertex reduceSinkVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
 
-        Assertions.assertThat(
-                        sourceMapFilterVertex
-                                .getMinResources()
-                                .equals(resource3.merge(resource2).merge(resource1)))
-                .isTrue();
-        Assertions.assertThat(
-                        reduceSinkVertex.getPreferredResources().equals(resource4.merge(resource5)))
-                .isTrue();
+        assertThat(sourceMapFilterVertex.getMinResources())
+                .isEqualTo(resource3.merge(resource2).merge(resource1));
+
+        assertThat(reduceSinkVertex.getPreferredResources()).isEqualTo(resource4.merge(resource5));
     }
 
     /**
@@ -440,7 +427,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
      * chaining and iteration cases) when generating job graph.
      */
     @Test
-    public void testResourcesForIteration() throws Exception {
+    void testResourcesForIteration() throws Exception {
         ResourceSpec resource1 = ResourceSpec.newBuilder(0.1, 100).build();
         ResourceSpec resource2 = ResourceSpec.newBuilder(0.2, 200).build();
         ResourceSpec resource3 = ResourceSpec.newBuilder(0.3, 300).build();
@@ -508,25 +495,21 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
         for (JobVertex jobVertex : jobGraph.getVertices()) {
             if (jobVertex.getName().contains("test_source")) {
-                Assertions.assertThat(jobVertex.getMinResources().equals(resource1)).isTrue();
+                assertThat(jobVertex.getMinResources()).isEqualTo(resource1);
             } else if (jobVertex.getName().contains("Iteration_Source")) {
-                Assertions.assertThat(jobVertex.getPreferredResources().equals(resource2)).isTrue();
+                assertThat(jobVertex.getPreferredResources()).isEqualTo(resource2);
             } else if (jobVertex.getName().contains("test_flatMap")) {
-                Assertions.assertThat(
-                                jobVertex.getMinResources().equals(resource3.merge(resource4)))
-                        .isTrue();
+                assertThat(jobVertex.getMinResources()).isEqualTo(resource3.merge(resource4));
             } else if (jobVertex.getName().contains("Iteration_Tail")) {
-                Assertions.assertThat(
-                                jobVertex.getPreferredResources().equals(ResourceSpec.DEFAULT))
-                        .isTrue();
+                assertThat(jobVertex.getPreferredResources()).isEqualTo(ResourceSpec.DEFAULT);
             } else if (jobVertex.getName().contains("test_sink")) {
-                Assertions.assertThat(jobVertex.getMinResources().equals(resource5)).isTrue();
+                assertThat(jobVertex.getMinResources()).isEqualTo(resource5);
             }
         }
     }
 
     @Test
-    public void testInputOutputFormat() {
+    void testInputOutputFormat() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
         DataStream<Long> source =
@@ -543,10 +526,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
         StreamGraph streamGraph = env.getStreamGraph();
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
-        Assertions.assertThat(jobGraph.getNumberOfVertices()).isEqualTo(1);
+        assertThat(jobGraph.getNumberOfVertices()).isEqualTo(1);
 
         JobVertex jobVertex = jobGraph.getVertices().iterator().next();
-        Assertions.assertThat(jobVertex instanceof InputOutputFormatVertex).isTrue();
+        assertThat(jobVertex).isInstanceOf(InputOutputFormatVertex.class);
 
         InputOutputFormatContainer formatContainer =
                 new InputOutputFormatContainer(
@@ -556,8 +539,8 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                 formatContainer.getInputFormats();
         Map<OperatorID, UserCodeWrapper<? extends OutputFormat<?>>> outputFormats =
                 formatContainer.getOutputFormats();
-        Assertions.assertThat(inputFormats.size()).isEqualTo(1);
-        Assertions.assertThat(outputFormats.size()).isEqualTo(2);
+        assertThat(inputFormats).hasSize(1);
+        assertThat(outputFormats).hasSize(2);
 
         Map<String, OperatorID> nameToOperatorIds = new HashMap<>();
         StreamConfig headConfig = new StreamConfig(jobVertex.getConfiguration());
@@ -572,19 +555,19 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
         InputFormat<?, ?> sourceFormat =
                 inputFormats.get(nameToOperatorIds.get("Source: source")).getUserCodeObject();
-        Assertions.assertThat(sourceFormat instanceof TypeSerializerInputFormat).isTrue();
+        assertThat(sourceFormat).isInstanceOf(TypeSerializerInputFormat.class);
 
         OutputFormat<?> sinkFormat1 =
                 outputFormats.get(nameToOperatorIds.get("Sink: sink1")).getUserCodeObject();
-        Assertions.assertThat(sinkFormat1 instanceof DiscardingOutputFormat).isTrue();
+        assertThat(sinkFormat1).isInstanceOf(DiscardingOutputFormat.class);
 
         OutputFormat<?> sinkFormat2 =
                 outputFormats.get(nameToOperatorIds.get("Sink: sink2")).getUserCodeObject();
-        Assertions.assertThat(sinkFormat2 instanceof DiscardingOutputFormat).isTrue();
+        assertThat(sinkFormat2).isInstanceOf(DiscardingOutputFormat.class);
     }
 
     @Test
-    public void testCoordinatedOperator() {
+    void testCoordinatedOperator() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         DataStream<Integer> source =
                 env.fromSource(
@@ -596,26 +579,26 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         StreamGraph streamGraph = env.getStreamGraph();
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
         // There should be only one job vertex.
-        Assertions.assertThat(jobGraph.getNumberOfVertices()).isEqualTo(1);
+        assertThat(jobGraph.getNumberOfVertices()).isEqualTo(1);
 
         JobVertex jobVertex = jobGraph.getVerticesAsArray()[0];
         List<SerializedValue<OperatorCoordinator.Provider>> coordinatorProviders =
                 jobVertex.getOperatorCoordinators();
         // There should be only one coordinator provider.
-        Assertions.assertThat(coordinatorProviders.size()).isEqualTo(1);
+        assertThat(coordinatorProviders).hasSize(1);
         // The invokable class should be SourceOperatorStreamTask.
         final ClassLoader classLoader = getClass().getClassLoader();
-        Assertions.assertThat(jobVertex.getInvokableClass(classLoader))
+        assertThat(jobVertex.getInvokableClass(classLoader))
                 .isEqualTo(SourceOperatorStreamTask.class);
         StreamOperatorFactory operatorFactory =
                 new StreamConfig(jobVertex.getConfiguration())
                         .getStreamOperatorFactory(classLoader);
-        Assertions.assertThat(operatorFactory instanceof SourceOperatorFactory).isTrue();
+        assertThat(operatorFactory).isInstanceOf(SourceOperatorFactory.class);
     }
 
     /** Test setting exchange mode to {@link StreamExchangeMode#PIPELINED}. */
     @Test
-    public void testExchangeModePipelined() {
+    void testExchangeModePipelined() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         // fromElements -> Map -> Print
         DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
@@ -642,19 +625,19 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
 
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(verticesSorted.size()).isEqualTo(2);
+        assertThat(verticesSorted).hasSize(2);
 
         // it can be chained with PIPELINED exchange mode
         JobVertex sourceAndMapVertex = verticesSorted.get(0);
 
         // PIPELINED exchange mode is translated into PIPELINED_BOUNDED result partition
-        Assertions.assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
+        assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
                 .isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
     }
 
     /** Test setting exchange mode to {@link StreamExchangeMode#BATCH}. */
     @Test
-    public void testExchangeModeBatch() {
+    void testExchangeModeBatch() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         env.setBufferTimeout(-1);
@@ -683,22 +666,22 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
 
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(verticesSorted.size()).isEqualTo(3);
+        assertThat(verticesSorted).hasSize(3);
 
         // it can not be chained with BATCH exchange mode
         JobVertex sourceVertex = verticesSorted.get(0);
         JobVertex mapVertex = verticesSorted.get(1);
 
         // BATCH exchange mode is translated into BLOCKING result partition
-        Assertions.assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
+        assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
                 .isEqualTo(ResultPartitionType.BLOCKING);
-        Assertions.assertThat(mapVertex.getProducedDataSets().get(0).getResultType())
+        assertThat(mapVertex.getProducedDataSets().get(0).getResultType())
                 .isEqualTo(ResultPartitionType.BLOCKING);
     }
 
     /** Test setting exchange mode to {@link StreamExchangeMode#UNDEFINED}. */
     @Test
-    public void testExchangeModeUndefined() {
+    void testExchangeModeUndefined() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         // fromElements -> Map -> Print
         DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
@@ -725,13 +708,13 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
 
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(verticesSorted.size()).isEqualTo(2);
+        assertThat(verticesSorted).hasSize(2);
 
         // it can be chained with UNDEFINED exchange mode
         JobVertex sourceAndMapVertex = verticesSorted.get(0);
 
         // UNDEFINED exchange mode is translated into PIPELINED_BOUNDED result partition by default
-        Assertions.assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
+        assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
                 .isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
     }
 
@@ -764,13 +747,13 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
 
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(verticesSorted.size()).isEqualTo(2);
+        assertThat(verticesSorted).hasSize(2);
 
         // it can be chained with HYBRID_FULL exchange mode
         JobVertex sourceAndMapVertex = verticesSorted.get(0);
 
         // HYBRID_FULL exchange mode is translated into HYBRID_FULL result partition
-        Assertions.assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
+        assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
                 .isEqualTo(ResultPartitionType.HYBRID_FULL);
     }
 
@@ -803,35 +786,35 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
 
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(verticesSorted.size()).isEqualTo(2);
+        assertThat(verticesSorted).hasSize(2);
 
         // it can be chained with HYBRID_SELECTIVE exchange mode
         JobVertex sourceAndMapVertex = verticesSorted.get(0);
 
         // HYBRID_SELECTIVE exchange mode is translated into HYBRID_SELECTIVE result partition
-        Assertions.assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
+        assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
                 .isEqualTo(ResultPartitionType.HYBRID_SELECTIVE);
     }
 
     @Test
-    public void testStreamingJobTypeByDefault() {
+    void testStreamingJobTypeByDefault() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.fromElements("test").addSink(new DiscardingSink<>());
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
-        Assertions.assertThat(jobGraph.getJobType()).isEqualTo(JobType.STREAMING);
+        assertThat(jobGraph.getJobType()).isEqualTo(JobType.STREAMING);
     }
 
     @Test
-    public void testBatchJobType() {
+    void testBatchJobType() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         env.fromElements("test").addSink(new DiscardingSink<>());
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
-        Assertions.assertThat(jobGraph.getJobType()).isEqualTo(JobType.BATCH);
+        assertThat(jobGraph.getJobType()).isEqualTo(JobType.BATCH);
     }
 
     @Test
-    public void testPartitionTypesInBatchMode() {
+    void testPartitionTypesInBatchMode() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         env.setParallelism(4);
@@ -851,35 +834,25 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
-        assertThat(
-                verticesSorted.get(0) /* source - forward */,
-                hasOutputPartitionType(ResultPartitionType.BLOCKING));
-        assertThat(
-                verticesSorted.get(1) /* rescale */,
-                hasOutputPartitionType(ResultPartitionType.BLOCKING));
-        assertThat(
-                verticesSorted.get(2) /* rebalance */,
-                hasOutputPartitionType(ResultPartitionType.BLOCKING));
-        assertThat(
-                verticesSorted.get(3) /* keyBy */,
-                hasOutputPartitionType(ResultPartitionType.BLOCKING));
-        assertThat(
-                verticesSorted.get(4) /* forward - sink */,
-                hasOutputPartitionType(ResultPartitionType.BLOCKING));
+        assertHasOutputPartitionType(
+                verticesSorted.get(0) /* source - forward */, ResultPartitionType.BLOCKING);
+        assertHasOutputPartitionType(
+                verticesSorted.get(1) /* rescale */, ResultPartitionType.BLOCKING);
+        assertHasOutputPartitionType(
+                verticesSorted.get(2) /* rebalance */, ResultPartitionType.BLOCKING);
+        assertHasOutputPartitionType(
+                verticesSorted.get(3) /* keyBy */, ResultPartitionType.BLOCKING);
+        assertHasOutputPartitionType(
+                verticesSorted.get(4) /* forward - sink */, ResultPartitionType.BLOCKING);
     }
 
-    private Matcher<JobVertex> hasOutputPartitionType(ResultPartitionType partitionType) {
-        return new FeatureMatcher<JobVertex, ResultPartitionType>(
-                equalTo(partitionType), "output partition type", "output partition type") {
-            @Override
-            protected ResultPartitionType featureValueOf(JobVertex actual) {
-                return actual.getProducedDataSets().get(0).getResultType();
-            }
-        };
+    private void assertHasOutputPartitionType(
+            JobVertex jobVertex, ResultPartitionType partitionType) {
+        assertThat(jobVertex.getProducedDataSets().get(0).getResultType()).isEqualTo(partitionType);
     }
 
     @Test
-    public void testNormalExchangeModeWithBufferTimeout() {
+    void testNormalExchangeModeWithBufferTimeout() {
         testCompatibleExchangeModeWithBufferTimeout(StreamExchangeMode.PIPELINED);
     }
 
@@ -901,7 +874,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     }
 
     @Test
-    public void testDisablingBufferTimeoutWithPipelinedExchanges() {
+    void testDisablingBufferTimeoutWithPipelinedExchanges() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
         env.setBufferTimeout(-1);
@@ -913,14 +886,14 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
             final StreamConfig streamConfig = new StreamConfig(vertex.getConfiguration());
             for (NonChainedOutput output :
                     streamConfig.getVertexNonChainedOutputs(this.getClass().getClassLoader())) {
-                assertThat(output.getBufferTimeout(), equalTo(-1L));
+                assertThat(output.getBufferTimeout()).isEqualTo(-1L);
             }
         }
     }
 
     /** Test iteration job, check slot sharing group and co-location group. */
     @Test
-    public void testIteration() {
+    void testIteration() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
         DataStream<Integer> source = env.fromElements(1, 2, 3).name("source");
@@ -933,45 +906,37 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
 
         SlotSharingGroup slotSharingGroup = jobGraph.getVerticesAsArray()[0].getSlotSharingGroup();
-        Assertions.assertThat(slotSharingGroup).isNotNull();
+        assertThat(slotSharingGroup).isNotNull();
 
         CoLocationGroup iterationSourceCoLocationGroup = null;
         CoLocationGroup iterationSinkCoLocationGroup = null;
 
         for (JobVertex jobVertex : jobGraph.getVertices()) {
             // all vertices have same slot sharing group by default
-            Assertions.assertThat(jobVertex.getSlotSharingGroup()).isEqualTo(slotSharingGroup);
+            assertThat(jobVertex.getSlotSharingGroup()).isEqualTo(slotSharingGroup);
 
             // all iteration vertices have same co-location group,
             // others have no co-location group by default
             if (jobVertex.getName().startsWith(StreamGraph.ITERATION_SOURCE_NAME_PREFIX)) {
                 iterationSourceCoLocationGroup = jobVertex.getCoLocationGroup();
-                Assertions.assertThat(
-                                iterationSourceCoLocationGroup
-                                        .getVertexIds()
-                                        .contains(jobVertex.getID()))
-                        .isTrue();
+                assertThat(iterationSourceCoLocationGroup.getVertexIds())
+                        .contains(jobVertex.getID());
             } else if (jobVertex.getName().startsWith(StreamGraph.ITERATION_SINK_NAME_PREFIX)) {
                 iterationSinkCoLocationGroup = jobVertex.getCoLocationGroup();
-                Assertions.assertThat(
-                                iterationSinkCoLocationGroup
-                                        .getVertexIds()
-                                        .contains(jobVertex.getID()))
-                        .isTrue();
+                assertThat(iterationSinkCoLocationGroup.getVertexIds()).contains(jobVertex.getID());
             } else {
-                Assertions.assertThat(jobVertex.getCoLocationGroup()).isNull();
+                assertThat(jobVertex.getCoLocationGroup()).isNull();
             }
         }
 
-        Assertions.assertThat(iterationSourceCoLocationGroup).isNotNull();
-        Assertions.assertThat(iterationSinkCoLocationGroup).isNotNull();
-        Assertions.assertThat(iterationSinkCoLocationGroup)
-                .isEqualTo(iterationSourceCoLocationGroup);
+        assertThat(iterationSourceCoLocationGroup).isNotNull();
+        assertThat(iterationSinkCoLocationGroup).isNotNull();
+        assertThat(iterationSinkCoLocationGroup).isEqualTo(iterationSourceCoLocationGroup);
     }
 
     /** Test default job type. */
     @Test
-    public void testDefaultJobType() {
+    void testDefaultJobType() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
         StreamGraph streamGraph =
@@ -979,11 +944,11 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                                 Collections.emptyList(), env.getConfig(), env.getCheckpointConfig())
                         .generate();
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
-        Assertions.assertThat(jobGraph.getJobType()).isEqualTo(JobType.STREAMING);
+        assertThat(jobGraph.getJobType()).isEqualTo(JobType.STREAMING);
     }
 
     @Test
-    public void testYieldingOperatorNotChainableToTaskChainedToLegacySource() {
+    void testYieldingOperatorNotChainableToTaskChainedToLegacySource() {
         StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1);
 
         chainEnv.fromElements(1)
@@ -998,16 +963,14 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                 streamGraph.getStreamNodes().stream()
                         .sorted(Comparator.comparingInt(StreamNode::getId))
                         .collect(Collectors.toList());
-        Assertions.assertThat(
-                        areOperatorsChainable(streamNodes.get(0), streamNodes.get(1), streamGraph))
+        assertThat(areOperatorsChainable(streamNodes.get(0), streamNodes.get(1), streamGraph))
                 .isTrue();
-        Assertions.assertThat(
-                        areOperatorsChainable(streamNodes.get(1), streamNodes.get(2), streamGraph))
+        assertThat(areOperatorsChainable(streamNodes.get(1), streamNodes.get(2), streamGraph))
                 .isFalse();
     }
 
     @Test
-    public void testYieldingOperatorChainableToTaskNotChainedToLegacySource() {
+    void testYieldingOperatorChainableToTaskNotChainedToLegacySource() {
         StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1);
 
         chainEnv.fromElements(1)
@@ -1022,11 +985,9 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                 streamGraph.getStreamNodes().stream()
                         .sorted(Comparator.comparingInt(StreamNode::getId))
                         .collect(Collectors.toList());
-        Assertions.assertThat(
-                        areOperatorsChainable(streamNodes.get(0), streamNodes.get(1), streamGraph))
+        assertThat(areOperatorsChainable(streamNodes.get(0), streamNodes.get(1), streamGraph))
                 .isFalse();
-        Assertions.assertThat(
-                        areOperatorsChainable(streamNodes.get(1), streamNodes.get(2), streamGraph))
+        assertThat(areOperatorsChainable(streamNodes.get(1), streamNodes.get(2), streamGraph))
                 .isTrue();
     }
 
@@ -1035,7 +996,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
      * chained to legacy sources, see FLINK-16219.
      */
     @Test
-    public void testYieldingOperatorProperlyChainedOnLegacySources() {
+    void testYieldingOperatorProperlyChainedOnLegacySources() {
         StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1);
 
         chainEnv.fromElements(1)
@@ -1050,9 +1011,9 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         final JobGraph jobGraph = chainEnv.getStreamGraph().getJobGraph();
 
         final List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(vertices.size()).isEqualTo(2);
-        Assertions.assertThat(vertices.get(0).getOperatorIDs().size()).isEqualTo(2);
-        Assertions.assertThat(vertices.get(1).getOperatorIDs().size()).isEqualTo(5);
+        assertThat(vertices).hasSize(2);
+        assertThat(vertices.get(0).getOperatorIDs()).hasSize(2);
+        assertThat(vertices.get(1).getOperatorIDs()).hasSize(5);
     }
 
     /**
@@ -1060,7 +1021,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
      * chained to new sources, see FLINK-20444.
      */
     @Test
-    public void testYieldingOperatorProperlyChainedOnNewSources() {
+    void testYieldingOperatorProperlyChainedOnNewSources() {
         StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1);
 
         chainEnv.fromSource(
@@ -1073,12 +1034,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         final JobGraph jobGraph = chainEnv.getStreamGraph().getJobGraph();
 
         final List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(vertices.size()).isEqualTo(1);
-        Assertions.assertThat(vertices.get(0).getOperatorIDs().size()).isEqualTo(4);
+        assertThat(vertices).hasSize(1);
+        assertThat(vertices.get(0).getOperatorIDs()).hasSize(4);
     }
 
     @Test
-    public void testDeterministicUnionOrder() {
+    void testDeterministicUnionOrder() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
         JobGraph jobGraph = getUnionJobGraph(env);
@@ -1092,14 +1053,14 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
             JobGraph jobGraph2 = getUnionJobGraph(env);
             JobVertex jobSink2 =
                     Iterables.getLast(jobGraph2.getVerticesSortedTopologicallyFromSources());
-            Assertions.assertThat(jobSink)
+            assertThat(jobSink)
                     .withFailMessage("Different runs should yield different vertexes")
                     .isNotEqualTo(jobSink2);
             List<String> actualSourceOrder =
                     jobSink2.getInputs().stream()
                             .map(edge -> edge.getSource().getProducer().getName())
                             .collect(Collectors.toList());
-            Assertions.assertThat(actualSourceOrder)
+            assertThat(actualSourceOrder)
                     .withFailMessage("Union inputs reordered")
                     .isEqualTo(expectedSourceOrder);
         }
@@ -1121,7 +1082,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     }
 
     @Test
-    public void testNotSupportInputSelectableOperatorIfCheckpointing() {
+    void testNotSupportInputSelectableOperatorIfCheckpointing() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.enableCheckpointing(60_000L);
 
@@ -1134,13 +1095,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                         new TestAnyModeReadingStreamOperator("test operator"))
                 .print();
 
-        Assertions.assertThatThrownBy(
-                        () -> StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()))
+        assertThatThrownBy(() -> StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()))
                 .isInstanceOf(UnsupportedOperationException.class);
     }
 
     @Test
-    public void testManagedMemoryFractionForUnknownResourceSpec() throws Exception {
+    void testManagedMemoryFractionForUnknownResourceSpec() throws Exception {
         final ResourceSpec resource = ResourceSpec.UNKNOWN;
         final List<ResourceSpec> resourceSpecs =
                 Arrays.asList(resource, resource, resource, resource);
@@ -1295,20 +1255,20 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
             double expectedStateBackendFrac,
             Configuration tmConfig) {
         final double delta = 0.000001;
-        Assertions.assertThat(
+        assertThat(
                         streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(
                                 ManagedMemoryUseCase.STATE_BACKEND,
                                 tmConfig,
                                 ClassLoader.getSystemClassLoader()))
                 .isCloseTo(expectedStateBackendFrac, Offset.offset(delta));
-        Assertions.assertThat(
+        assertThat(
                         streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(
                                 ManagedMemoryUseCase.PYTHON,
                                 tmConfig,
                                 ClassLoader.getSystemClassLoader()))
                 .isCloseTo(expectedPythonFrac, Offset.offset(delta));
 
-        Assertions.assertThat(
+        assertThat(
                         streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(
                                 ManagedMemoryUseCase.OPERATOR,
                                 tmConfig,
@@ -1354,7 +1314,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     }
 
     @Test
-    public void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultEnabled() {
+    void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultEnabled() {
         final StreamGraph streamGraph = createStreamGraphForSlotSharingTest(new Configuration());
         // specify slot sharing group for map1
         streamGraph.getStreamNodes().stream()
@@ -1366,7 +1326,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
 
         final List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(verticesSorted.size()).isEqualTo(4);
+        assertThat(verticesSorted).hasSize(4);
 
         final List<JobVertex> verticesMatched = getExpectedVerticesList(verticesSorted);
         final JobVertex source1Vertex = verticesMatched.get(0);
@@ -1381,13 +1341,13 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     }
 
     @Test
-    public void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled() {
+    void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled() {
         final StreamGraph streamGraph = createStreamGraphForSlotSharingTest(new Configuration());
         streamGraph.setAllVerticesInSameSlotSharingGroupByDefault(false);
         final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
 
         final List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(verticesSorted.size()).isEqualTo(4);
+        assertThat(verticesSorted).hasSize(4);
 
         final List<JobVertex> verticesMatched = getExpectedVerticesList(verticesSorted);
         final JobVertex source1Vertex = verticesMatched.get(0);
@@ -1400,7 +1360,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     }
 
     @Test
-    public void testSlotSharingResourceConfiguration() {
+    void testSlotSharingResourceConfiguration() {
         final String slotSharingGroup1 = "slot-a";
         final String slotSharingGroup2 = "slot-b";
         final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10);
@@ -1431,25 +1391,25 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         for (JobVertex jobVertex : jobGraph.getVertices()) {
             numVertex += 1;
             if (jobVertex.getName().contains(slotSharingGroup1)) {
-                Assertions.assertThat(resourceProfile1)
-                        .isEqualTo(jobVertex.getSlotSharingGroup().getResourceProfile());
+                assertThat(jobVertex.getSlotSharingGroup().getResourceProfile())
+                        .isEqualTo(resourceProfile1);
             } else if (jobVertex.getName().contains(slotSharingGroup2)) {
-                Assertions.assertThat(resourceProfile2)
-                        .isEqualTo(jobVertex.getSlotSharingGroup().getResourceProfile());
+                assertThat(jobVertex.getSlotSharingGroup().getResourceProfile())
+                        .isEqualTo(resourceProfile2);
             } else if (jobVertex
                     .getName()
                     .contains(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)) {
-                Assertions.assertThat(resourceProfile3)
-                        .isEqualTo(jobVertex.getSlotSharingGroup().getResourceProfile());
+                assertThat(jobVertex.getSlotSharingGroup().getResourceProfile())
+                        .isEqualTo(resourceProfile3);
             } else {
                 Assertions.fail("");
             }
         }
-        assertThat(numVertex, is(3));
+        assertThat(numVertex).isEqualTo(3);
     }
 
     @Test
-    public void testSlotSharingResourceConfigurationWithDefaultSlotSharingGroup() {
+    void testSlotSharingResourceConfigurationWithDefaultSlotSharingGroup() {
         final ResourceProfile resourceProfile = ResourceProfile.fromResources(1, 10);
         final Map<String, ResourceProfile> slotSharingGroupResource = new HashMap<>();
         slotSharingGroupResource.put(
@@ -1465,23 +1425,22 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         int numVertex = 0;
         for (JobVertex jobVertex : jobGraph.getVertices()) {
             numVertex += 1;
-            Assertions.assertThat(resourceProfile)
-                    .isEqualTo(jobVertex.getSlotSharingGroup().getResourceProfile());
+            assertThat(jobVertex.getSlotSharingGroup().getResourceProfile())
+                    .isEqualTo(resourceProfile);
         }
-        assertThat(numVertex, is(2));
+        assertThat(numVertex).isEqualTo(2);
     }
 
     @Test
-    public void testNamingOfChainedMultipleInputs() {
+    void testNamingOfChainedMultipleInputs() {
         String[] sources = new String[] {"source-1", "source-2", "source-3"};
         JobGraph graph = createGraphWithMultipleInputs(true, sources);
         JobVertex head = graph.getVerticesSortedTopologicallyFromSources().iterator().next();
-        Assertions.assertThat(sources)
-                .allMatch(source -> head.getOperatorPrettyName().contains(source));
+        assertThat(sources).allMatch(source -> head.getOperatorPrettyName().contains(source));
     }
 
     @Test
-    public void testNamingOfNonChainedMultipleInputs() {
+    void testNamingOfNonChainedMultipleInputs() {
         String[] sources = new String[] {"source-1", "source-2", "source-3"};
         JobGraph graph = createGraphWithMultipleInputs(false, sources);
         JobVertex head =
@@ -1490,12 +1449,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                         vertex ->
                                 vertex.getInvokableClassName()
                                         .equals(MultipleInputStreamTask.class.getName()));
-        Assertions.assertThat(head.getName().contains("source-1"))
-                .withFailMessage(head.getName())
-                .isFalse();
-        Assertions.assertThat(head.getOperatorPrettyName().contains("source-1"))
+        assertThat(head.getName()).withFailMessage(head.getName()).doesNotContain("source-1");
+        assertThat(head.getOperatorPrettyName())
                 .withFailMessage(head.getOperatorPrettyName())
-                .isFalse();
+                .doesNotContain("source-1");
     }
 
     public JobGraph createGraphWithMultipleInputs(boolean chain, String... inputNames) {
@@ -1521,12 +1478,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     }
 
     @Test
-    public void testTreeDescription() {
+    void testTreeDescription() {
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         JobGraph job = createJobGraphWithDescription(env, "test source");
         JobVertex[] allVertices = job.getVerticesAsArray();
-        Assertions.assertThat(allVertices.length).isEqualTo(1);
-        Assertions.assertThat(allVertices[0].getOperatorPrettyName())
+        assertThat(allVertices).hasSize(1);
+        assertThat(allVertices[0].getOperatorPrettyName())
                 .isEqualTo(
                         "test source\n"
                                 + ":- x + 1\n"
@@ -1538,12 +1495,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     }
 
     @Test
-    public void testTreeDescriptionWithChainedSource() {
+    void testTreeDescriptionWithChainedSource() {
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         JobGraph job = createJobGraphWithDescription(env, "test source 1", "test source 2");
         JobVertex[] allVertices = job.getVerticesAsArray();
-        Assertions.assertThat(allVertices.length).isEqualTo(1);
-        Assertions.assertThat(allVertices[0].getOperatorPrettyName())
+        assertThat(allVertices).hasSize(1);
+        assertThat(allVertices[0].getOperatorPrettyName())
                 .isEqualTo(
                         "operator chained with source [test source 1, test source 2]\n"
                                 + ":- x + 1\n"
@@ -1555,7 +1512,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     }
 
     @Test
-    public void testCascadingDescription() {
+    void testCascadingDescription() {
         final Configuration config = new Configuration();
         config.set(
                 PipelineOptions.VERTEX_DESCRIPTION_MODE,
@@ -1564,14 +1521,14 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                 StreamExecutionEnvironment.getExecutionEnvironment(config);
         JobGraph job = createJobGraphWithDescription(env, "test source");
         JobVertex[] allVertices = job.getVerticesAsArray();
-        Assertions.assertThat(allVertices.length).isEqualTo(1);
-        Assertions.assertThat(allVertices[0].getOperatorPrettyName())
+        assertThat(allVertices).hasSize(1);
+        assertThat(allVertices[0].getOperatorPrettyName())
                 .isEqualTo(
                         "test source -> (x + 1 -> (first print of map1 , second print of map1) , x + 2 -> (first print of map2 , second print of map2))");
     }
 
     @Test
-    public void testCascadingDescriptionWithChainedSource() {
+    void testCascadingDescriptionWithChainedSource() {
         final Configuration config = new Configuration();
         config.set(
                 PipelineOptions.VERTEX_DESCRIPTION_MODE,
@@ -1580,38 +1537,38 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                 StreamExecutionEnvironment.getExecutionEnvironment(config);
         JobGraph job = createJobGraphWithDescription(env, "test source 1", "test source 2");
         JobVertex[] allVertices = job.getVerticesAsArray();
-        Assertions.assertThat(allVertices.length).isEqualTo(1);
-        Assertions.assertThat(allVertices[0].getOperatorPrettyName())
+        assertThat(allVertices).hasSize(1);
+        assertThat(allVertices[0].getOperatorPrettyName())
                 .isEqualTo(
                         "operator chained with source [test source 1, test source 2] -> (x + 1 -> (first print of map1 , second print of map1) , x + 2 -> (first print of map2 , second print of map2))");
     }
 
     @Test
-    public void testNamingWithoutIndex() {
+    void testNamingWithoutIndex() {
         JobGraph job = createStreamGraphForSlotSharingTest(new Configuration()).getJobGraph();
         List<JobVertex> allVertices = job.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(allVertices.size()).isEqualTo(4);
-        Assertions.assertThat(allVertices.get(0).getName()).isEqualTo("Source: source1");
-        Assertions.assertThat(allVertices.get(1).getName()).isEqualTo("Source: source2");
-        Assertions.assertThat(allVertices.get(2).getName()).isEqualTo("map1");
-        Assertions.assertThat(allVertices.get(3).getName()).isEqualTo("map2");
+        assertThat(allVertices).hasSize(4);
+        assertThat(allVertices.get(0).getName()).isEqualTo("Source: source1");
+        assertThat(allVertices.get(1).getName()).isEqualTo("Source: source2");
+        assertThat(allVertices.get(2).getName()).isEqualTo("map1");
+        assertThat(allVertices.get(3).getName()).isEqualTo("map2");
     }
 
     @Test
-    public void testNamingWithIndex() {
+    void testNamingWithIndex() {
         Configuration config = new Configuration();
         config.setBoolean(PipelineOptions.VERTEX_NAME_INCLUDE_INDEX_PREFIX, true);
         JobGraph job = createStreamGraphForSlotSharingTest(config).getJobGraph();
         List<JobVertex> allVertices = job.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(allVertices.size()).isEqualTo(4);
-        Assertions.assertThat(allVertices.get(0).getName()).isEqualTo("[vertex-0]Source: source1");
-        Assertions.assertThat(allVertices.get(1).getName()).isEqualTo("[vertex-1]Source: source2");
-        Assertions.assertThat(allVertices.get(2).getName()).isEqualTo("[vertex-2]map1");
-        Assertions.assertThat(allVertices.get(3).getName()).isEqualTo("[vertex-3]map2");
+        assertThat(allVertices).hasSize(4);
+        assertThat(allVertices.get(0).getName()).isEqualTo("[vertex-0]Source: source1");
+        assertThat(allVertices.get(1).getName()).isEqualTo("[vertex-1]Source: source2");
+        assertThat(allVertices.get(2).getName()).isEqualTo("[vertex-2]map1");
+        assertThat(allVertices.get(3).getName()).isEqualTo("[vertex-3]map2");
     }
 
     @Test
-    public void testCacheJobGraph() throws Throwable {
+    void testCacheJobGraph() throws Throwable {
         final TestingStreamExecutionEnvironment env = new TestingStreamExecutionEnvironment();
         env.setParallelism(2);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -1619,8 +1576,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         DataStream<Integer> source = env.fromElements(1, 2, 3).name("source");
         CachedDataStream<Integer> cachedStream =
                 source.map(i -> i + 1).name("map-1").map(i -> i + 1).name("map-2").cache();
-        Assertions.assertThat(cachedStream.getTransformation())
-                .isInstanceOf(CacheTransformation.class);
+        assertThat(cachedStream.getTransformation()).isInstanceOf(CacheTransformation.class);
         CacheTransformation<Integer> cacheTransformation =
                 (CacheTransformation<Integer>) cachedStream.getTransformation();
 
@@ -1628,7 +1584,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
         JobGraph jobGraph = env.getStreamGraph().getJobGraph();
         List<JobVertex> allVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(allVertices.size()).isEqualTo(3);
+        assertThat(allVertices).hasSize(3);
 
         final JobVertex cacheWriteVertex =
                 allVertices.stream()
@@ -1641,13 +1597,13 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                                                         "CacheWrite job vertex not found"));
 
         final List<JobEdge> inputs = cacheWriteVertex.getInputs();
-        Assertions.assertThat(inputs.size()).isEqualTo(1);
-        Assertions.assertThat(inputs.get(0).getDistributionPattern()).isEqualTo(POINTWISE);
-        Assertions.assertThat(inputs.get(0).getSource().getResultType())
+        assertThat(inputs).hasSize(1);
+        assertThat(inputs.get(0).getDistributionPattern()).isEqualTo(POINTWISE);
+        assertThat(inputs.get(0).getSource().getResultType())
                 .isEqualTo(ResultPartitionType.BLOCKING_PERSISTENT);
-        Assertions.assertThat(new AbstractID(inputs.get(0).getSourceId()))
+        assertThat(new AbstractID(inputs.get(0).getSourceId()))
                 .isEqualTo(cacheTransformation.getDatasetId());
-        Assertions.assertThat(inputs.get(0).getSource().getProducer().getName())
+        assertThat(inputs.get(0).getSource().getProducer().getName())
                 .isEqualTo("map-1 -> map-2 -> Sink: print");
 
         env.addCompletedClusterDataset(cacheTransformation.getDatasetId());
@@ -1655,13 +1611,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
         jobGraph = env.getStreamGraph().getJobGraph();
         allVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(allVertices.size()).isEqualTo(1);
-        Assertions.assertThat(allVertices.get(0).getName()).isEqualTo("CacheRead -> Sink: print");
-        Assertions.assertThat(allVertices.get(0).getIntermediateDataSetIdsToConsume().size())
-                .isEqualTo(1);
-        Assertions.assertThat(
-                        new AbstractID(
-                                allVertices.get(0).getIntermediateDataSetIdsToConsume().get(0)))
+        assertThat(allVertices).hasSize(1);
+        assertThat(allVertices.get(0).getName()).isEqualTo("CacheRead -> Sink: print");
+        assertThat(allVertices.get(0).getIntermediateDataSetIdsToConsume()).hasSize(1);
+        assertThat(new AbstractID(allVertices.get(0).getIntermediateDataSetIdsToConsume().get(0)))
                 .isEqualTo(cacheTransformation.getDatasetId());
     }
 
@@ -1670,7 +1623,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
      * dataset if they have the same parallelism and partitioner.
      */
     @Test
-    public void testIntermediateDataSetReuse() {
+    void testIntermediateDataSetReuse() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setBufferTimeout(-1);
         DataStream<Integer> source = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
@@ -1700,25 +1653,25 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
 
         List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(vertices.size()).isEqualTo(9);
+        assertThat(vertices).hasSize(9);
 
         JobVertex sourceVertex = vertices.get(0);
         List<IntermediateDataSetID> producedDataSet =
                 sourceVertex.getProducedDataSets().stream()
                         .map(IntermediateDataSet::getId)
                         .collect(Collectors.toList());
-        Assertions.assertThat(producedDataSet.size()).isEqualTo(6);
+        assertThat(producedDataSet).hasSize(6);
 
         JobVertex sinkVertex1 = checkNotNull(findJobVertexWithName(vertices, "sink1"));
         JobVertex sinkVertex2 = checkNotNull(findJobVertexWithName(vertices, "sink2"));
         JobVertex sinkVertex3 = checkNotNull(findJobVertexWithName(vertices, "sink3"));
         JobVertex sinkVertex4 = checkNotNull(findJobVertexWithName(vertices, "sink4"));
 
-        Assertions.assertThat(sinkVertex2.getInputs().get(0).getSource().getId())
+        assertThat(sinkVertex2.getInputs().get(0).getSource().getId())
                 .isEqualTo(sinkVertex1.getInputs().get(0).getSource().getId());
-        Assertions.assertThat(sinkVertex4.getInputs().get(0).getSource().getId())
+        assertThat(sinkVertex4.getInputs().get(0).getSource().getId())
                 .isEqualTo(sinkVertex3.getInputs().get(0).getSource().getId());
-        Assertions.assertThat(sinkVertex3.getInputs().get(0).getSource().getId())
+        assertThat(sinkVertex3.getInputs().get(0).getSource().getId())
                 .isNotEqualTo(sinkVertex1.getInputs().get(0).getSource().getId());
 
         StreamConfig streamConfig = new StreamConfig(sourceVertex.getConfiguration());
@@ -1726,18 +1679,16 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                 streamConfig.getOperatorNonChainedOutputs(getClass().getClassLoader()).stream()
                         .map(NonChainedOutput::getDataSetId)
                         .collect(Collectors.toList());
-        Assertions.assertThat(nonChainedOutputs.size()).isEqualTo(5);
-        Assertions.assertThat(
-                        nonChainedOutputs.contains(
-                                sinkVertex3.getInputs().get(0).getSource().getId()))
-                .isFalse();
+        assertThat(nonChainedOutputs).hasSize(5);
+        assertThat(nonChainedOutputs)
+                .doesNotContain(sinkVertex3.getInputs().get(0).getSource().getId());
 
         List<IntermediateDataSetID> streamOutputsInOrder =
                 streamConfig.getVertexNonChainedOutputs(getClass().getClassLoader()).stream()
                         .map(NonChainedOutput::getDataSetId)
                         .collect(Collectors.toList());
-        Assertions.assertThat(streamOutputsInOrder.size()).isEqualTo(6);
-        Assertions.assertThat(streamOutputsInOrder.toArray()).isEqualTo(producedDataSet.toArray());
+        assertThat(streamOutputsInOrder).hasSize(6);
+        assertThat(streamOutputsInOrder).isEqualTo(producedDataSet);
     }
 
     private static JobVertex findJobVertexWithName(List<JobVertex> vertices, String name) {
@@ -1841,7 +1792,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
     private void assertSameSlotSharingGroup(JobVertex... vertices) {
         for (int i = 0; i < vertices.length - 1; i++) {
-            Assertions.assertThat(vertices[i + 1].getSlotSharingGroup())
+            assertThat(vertices[i + 1].getSlotSharingGroup())
                     .isEqualTo(vertices[i].getSlotSharingGroup());
         }
     }
@@ -1849,7 +1800,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     private void assertDistinctSharingGroups(JobVertex... vertices) {
         for (int i = 0; i < vertices.length - 1; i++) {
             for (int j = i + 1; j < vertices.length; j++) {
-                Assertions.assertThat(vertices[i].getSlotSharingGroup())
+                assertThat(vertices[i].getSlotSharingGroup())
                         .isNotEqualTo(vertices[j].getSlotSharingGroup());
             }
         }