You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/28 09:23:23 UTC

[flink] branch release-1.16 updated (790434b9fb7 -> f66cb67687e)

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 790434b9fb7 [FLINK-29425] Hybrid full spilling strategy triggering spilling frequently
     new 32b95e7715c [hotfix] Migrate StreamingJobGraphGeneratorTest all tests to assertj and make testing method package private
     new f66cb67687e [FLINK-29431] Exceptions during job graph serialization lock up client

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/streaming/api/graph/StreamConfig.java    |  13 +-
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 554 ++++++++++++---------
 2 files changed, 310 insertions(+), 257 deletions(-)


[flink] 02/02: [FLINK-29431] Exceptions during job graph serialization lock up client

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f66cb67687e7df355006630d50b81307d281c150
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Sep 28 12:10:54 2022 +0800

    [FLINK-29431] Exceptions during job graph serialization lock up client
    
    This closes #20912
---
 .../flink/streaming/api/graph/StreamConfig.java    |  13 +--
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 105 +++++++++++++++++++++
 2 files changed, 110 insertions(+), 8 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 57cb1bca1ba..479b2004580 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -164,10 +164,9 @@ public class StreamConfig implements Serializable {
         FutureUtils.combineAll(chainedTaskFutures.values())
                 .thenAcceptAsync(
                         chainedConfigs -> {
-                            // Serialize all the objects to config.
-                            serializeAllConfigs();
-
                             try {
+                                // Serialize all the objects to config.
+                                serializeAllConfigs();
                                 InstantiationUtil.writeObjectToConfig(
                                         chainedConfigs.stream()
                                                 .collect(
@@ -176,12 +175,10 @@ public class StreamConfig implements Serializable {
                                                                 Function.identity())),
                                         this.config,
                                         CHAINED_TASK_CONFIG);
-                            } catch (IOException e) {
-                                throw new StreamTaskException(
-                                        "Could not serialize object for key chained task config.",
-                                        e);
+                                serializationFuture.complete(this);
+                            } catch (Throwable throwable) {
+                                serializationFuture.completeExceptionally(throwable);
                             }
-                            serializationFuture.complete(this);
                         },
                         ioExecutor);
         return serializationFuture;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index dc534008158..23a481c4ff0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -75,9 +75,11 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
@@ -94,6 +96,7 @@ import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
 import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
@@ -109,6 +112,8 @@ import org.assertj.core.data.Offset;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -1691,6 +1696,106 @@ class StreamingJobGraphGeneratorTest {
         assertThat(streamOutputsInOrder).isEqualTo(producedDataSet);
     }
 
+    @Test
+    void testStreamConfigSerializationException() {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<Integer> source = env.fromElements(1, 2, 3);
+        env.addOperator(
+                new OneInputTransformation<>(
+                        source.getTransformation(),
+                        "serializationTestOperator",
+                        // using a non-serializable operator factory to trigger an IOException when
+                        // try to serialize streamConfig.
+                        new SerializationTestOperatorFactory(false),
+                        Types.INT,
+                        1));
+        final StreamGraph streamGraph = env.getStreamGraph();
+        assertThatThrownBy(() -> StreamingJobGraphGenerator.createJobGraph(streamGraph))
+                .hasRootCauseInstanceOf(IOException.class)
+                .hasRootCauseMessage("This operator factory is not serializable.");
+    }
+
+    @Test
+    public void testCoordinatedSerializationException() {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStreamSource<Integer> source = env.fromElements(1, 2, 3);
+        env.addOperator(
+                new OneInputTransformation<>(
+                        source.getTransformation(),
+                        "serializationTestOperator",
+                        new SerializationTestOperatorFactory(true),
+                        Types.INT,
+                        1));
+
+        StreamGraph streamGraph = env.getStreamGraph();
+        assertThatThrownBy(() -> StreamingJobGraphGenerator.createJobGraph(streamGraph))
+                .hasRootCauseInstanceOf(IOException.class)
+                .hasRootCauseMessage("This provider is not serializable.");
+    }
+
+    private static class SerializationTestOperatorFactory
+            extends AbstractStreamOperatorFactory<Integer>
+            implements CoordinatedOperatorFactory<Integer> {
+        private final boolean isOperatorFactorySerializable;
+
+        SerializationTestOperatorFactory(boolean isOperatorFactorySerializable) {
+            this.isOperatorFactorySerializable = isOperatorFactorySerializable;
+        }
+
+        @Override
+        public OperatorCoordinator.Provider getCoordinatorProvider(
+                String operatorName, OperatorID operatorID) {
+            return new NonSerializableCoordinatorProvider();
+        }
+
+        private void writeObject(ObjectOutputStream oos) throws IOException {
+            if (!isOperatorFactorySerializable) {
+                throw new IOException("This operator factory is not serializable.");
+            }
+        }
+
+        @Override
+        public <T extends StreamOperator<Integer>> T createStreamOperator(
+                StreamOperatorParameters<Integer> parameters) {
+            // today's lunch is generics spaghetti
+            @SuppressWarnings("unchecked")
+            final T castedOperator = (T) new SerializationTestOperator();
+            return castedOperator;
+        }
+
+        @Override
+        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
+            return SerializationTestOperator.class;
+        }
+    }
+
+    private static class SerializationTestOperator extends AbstractStreamOperator<Integer>
+            implements OneInputStreamOperator<Integer, Integer> {
+
+        @Override
+        public void processElement(StreamRecord<Integer> element) throws Exception {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private static class NonSerializableCoordinatorProvider
+            implements OperatorCoordinator.Provider {
+
+        @Override
+        public OperatorID getOperatorId() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        private void writeObject(ObjectOutputStream oos) throws IOException {
+            throw new IOException("This provider is not serializable.");
+        }
+    }
+
     private static JobVertex findJobVertexWithName(List<JobVertex> vertices, String name) {
         for (JobVertex jobVertex : vertices) {
             if (jobVertex.getName().contains(name)) {


[flink] 01/02: [hotfix] Migrate StreamingJobGraphGeneratorTest all tests to assertj and make testing method package private

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 32b95e7715c2eda0ff9523a593c798ba3b47c397
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Sep 28 11:47:41 2022 +0800

    [hotfix] Migrate StreamingJobGraphGeneratorTest all tests to assertj and make testing method package private
---
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 449 +++++++++------------
 1 file changed, 200 insertions(+), 249 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 7da1c0e3697..dc534008158 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -100,15 +100,14 @@ import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 
 import org.assertj.core.api.Assertions;
 import org.assertj.core.data.Offset;
-import org.hamcrest.FeatureMatcher;
-import org.hamcrest.Matcher;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -126,17 +125,16 @@ import java.util.stream.Collectors;
 import static org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE;
 import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.areOperatorsChainable;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
 
 /** Tests for {@link StreamingJobGraphGenerator}. */
+@ExtendWith(TestLoggerExtension.class)
 @SuppressWarnings("serial")
-public class StreamingJobGraphGeneratorTest extends TestLogger {
+class StreamingJobGraphGeneratorTest {
 
     @Test
-    public void testParallelismOneNotChained() {
+    void testParallelismOneNotChained() {
 
         // --------- the program ---------
 
@@ -179,16 +177,16 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobGraph jobGraph = streamGraph.getJobGraph();
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
 
-        Assertions.assertThat(jobGraph.getNumberOfVertices()).isEqualTo(2);
-        Assertions.assertThat(verticesSorted.get(0).getParallelism()).isEqualTo(1);
-        Assertions.assertThat(verticesSorted.get(1).getParallelism()).isEqualTo(1);
+        assertThat(jobGraph.getNumberOfVertices()).isEqualTo(2);
+        assertThat(verticesSorted.get(0).getParallelism()).isEqualTo(1);
+        assertThat(verticesSorted.get(1).getParallelism()).isEqualTo(1);
 
         JobVertex sourceVertex = verticesSorted.get(0);
         JobVertex mapSinkVertex = verticesSorted.get(1);
 
-        Assertions.assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
+        assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
                 .isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
-        Assertions.assertThat(mapSinkVertex.getInputs().get(0).getSource().getResultType())
+        assertThat(mapSinkVertex.getInputs().get(0).getSource().getResultType())
                 .isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
     }
 
@@ -197,40 +195,36 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
      * checkpoint mode to {@link CheckpointingMode#AT_LEAST_ONCE}.
      */
     @Test
-    public void testDisabledCheckpointing() throws Exception {
+    void testDisabledCheckpointing() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.fromElements(0).print();
         StreamGraph streamGraph = env.getStreamGraph();
-        Assertions.assertThat(streamGraph.getCheckpointConfig().isCheckpointingEnabled())
+        assertThat(streamGraph.getCheckpointConfig().isCheckpointingEnabled())
                 .withFailMessage("Checkpointing enabled")
                 .isFalse();
 
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
 
         JobCheckpointingSettings snapshottingSettings = jobGraph.getCheckpointingSettings();
-        Assertions.assertThat(
+        assertThat(
                         snapshottingSettings
                                 .getCheckpointCoordinatorConfiguration()
                                 .getCheckpointInterval())
                 .isEqualTo(Long.MAX_VALUE);
-        Assertions.assertThat(
-                        snapshottingSettings
-                                .getCheckpointCoordinatorConfiguration()
-                                .isExactlyOnce())
+        assertThat(snapshottingSettings.getCheckpointCoordinatorConfiguration().isExactlyOnce())
                 .isFalse();
 
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
         StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration());
-        Assertions.assertThat(streamConfig.getCheckpointMode())
-                .isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
+        assertThat(streamConfig.getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
     }
 
     @Test
-    public void testEnabledUnalignedCheckAndDisabledCheckpointing() {
+    void testEnabledUnalignedCheckAndDisabledCheckpointing() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.fromElements(0).print();
         StreamGraph streamGraph = env.getStreamGraph();
-        Assertions.assertThat(streamGraph.getCheckpointConfig().isCheckpointingEnabled())
+        assertThat(streamGraph.getCheckpointConfig().isCheckpointingEnabled())
                 .withFailMessage("Checkpointing enabled")
                 .isFalse();
         env.getCheckpointConfig().enableUnalignedCheckpoints(true);
@@ -239,13 +233,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
         StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration());
-        Assertions.assertThat(streamConfig.getCheckpointMode())
-                .isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
-        Assertions.assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
+        assertThat(streamConfig.getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
+        assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
     }
 
     @Test
-    public void testUnalignedCheckAndAtLeastOnce() {
+    void testUnalignedCheckAndAtLeastOnce() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.fromElements(0).print();
         StreamGraph streamGraph = env.getStreamGraph();
@@ -256,13 +249,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
         StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration());
-        Assertions.assertThat(streamConfig.getCheckpointMode())
-                .isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
-        Assertions.assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
+        assertThat(streamConfig.getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
+        assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
     }
 
     @Test
-    public void generatorForwardsSavepointRestoreSettings() {
+    void generatorForwardsSavepointRestoreSettings() {
         StreamGraph streamGraph =
                 new StreamGraph(
                         new ExecutionConfig(),
@@ -272,12 +264,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
 
         SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
-        assertThat(savepointRestoreSettings.getRestorePath(), is("hello"));
+        assertThat(savepointRestoreSettings.getRestorePath()).isEqualTo("hello");
     }
 
     /** Verifies that the chain start/end is correctly set. */
     @Test
-    public void testChainStartEndSetting() throws Exception {
+    void testChainStartEndSetting() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
         // set parallelism to 2 to avoid chaining with source in case when available processors is
@@ -300,9 +292,9 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobVertex sourceVertex = verticesSorted.get(0);
         JobVertex mapPrintVertex = verticesSorted.get(1);
 
-        Assertions.assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
+        assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
                 .isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
-        Assertions.assertThat(mapPrintVertex.getInputs().get(0).getSource().getResultType())
+        assertThat(mapPrintVertex.getInputs().get(0).getSource().getResultType())
                 .isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
 
         StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration());
@@ -311,18 +303,18 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                 mapConfig.getTransitiveChainedTaskConfigs(getClass().getClassLoader());
         StreamConfig printConfig = chainedConfigs.values().iterator().next();
 
-        Assertions.assertThat(sourceConfig.isChainStart()).isTrue();
-        Assertions.assertThat(sourceConfig.isChainEnd()).isTrue();
+        assertThat(sourceConfig.isChainStart()).isTrue();
+        assertThat(sourceConfig.isChainEnd()).isTrue();
 
-        Assertions.assertThat(mapConfig.isChainStart()).isTrue();
-        Assertions.assertThat(mapConfig.isChainEnd()).isFalse();
+        assertThat(mapConfig.isChainStart()).isTrue();
+        assertThat(mapConfig.isChainEnd()).isFalse();
 
-        Assertions.assertThat(printConfig.isChainStart()).isFalse();
-        Assertions.assertThat(printConfig.isChainEnd()).isTrue();
+        assertThat(printConfig.isChainStart()).isFalse();
+        assertThat(printConfig.isChainEnd()).isTrue();
     }
 
     @Test
-    public void testOperatorCoordinatorAddedToJobVertex() {
+    void testOperatorCoordinatorAddedToJobVertex() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         DataStream<Integer> stream =
                 env.fromSource(
@@ -342,8 +334,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
 
-        Assertions.assertThat(jobGraph.getVerticesAsArray()[0].getOperatorCoordinators().size())
-                .isEqualTo(2);
+        assertThat(jobGraph.getVerticesAsArray()[0].getOperatorCoordinators()).hasSize(2);
     }
 
     /**
@@ -351,7 +342,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
      * sink cases) when generating job graph.
      */
     @Test
-    public void testResourcesForChainedSourceSink() throws Exception {
+    void testResourcesForChainedSourceSink() throws Exception {
         ResourceSpec resource1 = ResourceSpec.newBuilder(0.1, 100).build();
         ResourceSpec resource2 = ResourceSpec.newBuilder(0.2, 200).build();
         ResourceSpec resource3 = ResourceSpec.newBuilder(0.3, 300).build();
@@ -425,14 +416,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                 jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
         JobVertex reduceSinkVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
 
-        Assertions.assertThat(
-                        sourceMapFilterVertex
-                                .getMinResources()
-                                .equals(resource3.merge(resource2).merge(resource1)))
-                .isTrue();
-        Assertions.assertThat(
-                        reduceSinkVertex.getPreferredResources().equals(resource4.merge(resource5)))
-                .isTrue();
+        assertThat(sourceMapFilterVertex.getMinResources())
+                .isEqualTo(resource3.merge(resource2).merge(resource1));
+
+        assertThat(reduceSinkVertex.getPreferredResources()).isEqualTo(resource4.merge(resource5));
     }
 
     /**
@@ -440,7 +427,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
      * chaining and iteration cases) when generating job graph.
      */
     @Test
-    public void testResourcesForIteration() throws Exception {
+    void testResourcesForIteration() throws Exception {
         ResourceSpec resource1 = ResourceSpec.newBuilder(0.1, 100).build();
         ResourceSpec resource2 = ResourceSpec.newBuilder(0.2, 200).build();
         ResourceSpec resource3 = ResourceSpec.newBuilder(0.3, 300).build();
@@ -508,25 +495,21 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
         for (JobVertex jobVertex : jobGraph.getVertices()) {
             if (jobVertex.getName().contains("test_source")) {
-                Assertions.assertThat(jobVertex.getMinResources().equals(resource1)).isTrue();
+                assertThat(jobVertex.getMinResources()).isEqualTo(resource1);
             } else if (jobVertex.getName().contains("Iteration_Source")) {
-                Assertions.assertThat(jobVertex.getPreferredResources().equals(resource2)).isTrue();
+                assertThat(jobVertex.getPreferredResources()).isEqualTo(resource2);
             } else if (jobVertex.getName().contains("test_flatMap")) {
-                Assertions.assertThat(
-                                jobVertex.getMinResources().equals(resource3.merge(resource4)))
-                        .isTrue();
+                assertThat(jobVertex.getMinResources()).isEqualTo(resource3.merge(resource4));
             } else if (jobVertex.getName().contains("Iteration_Tail")) {
-                Assertions.assertThat(
-                                jobVertex.getPreferredResources().equals(ResourceSpec.DEFAULT))
-                        .isTrue();
+                assertThat(jobVertex.getPreferredResources()).isEqualTo(ResourceSpec.DEFAULT);
             } else if (jobVertex.getName().contains("test_sink")) {
-                Assertions.assertThat(jobVertex.getMinResources().equals(resource5)).isTrue();
+                assertThat(jobVertex.getMinResources()).isEqualTo(resource5);
             }
         }
     }
 
     @Test
-    public void testInputOutputFormat() {
+    void testInputOutputFormat() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
         DataStream<Long> source =
@@ -543,10 +526,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
         StreamGraph streamGraph = env.getStreamGraph();
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
-        Assertions.assertThat(jobGraph.getNumberOfVertices()).isEqualTo(1);
+        assertThat(jobGraph.getNumberOfVertices()).isEqualTo(1);
 
         JobVertex jobVertex = jobGraph.getVertices().iterator().next();
-        Assertions.assertThat(jobVertex instanceof InputOutputFormatVertex).isTrue();
+        assertThat(jobVertex).isInstanceOf(InputOutputFormatVertex.class);
 
         InputOutputFormatContainer formatContainer =
                 new InputOutputFormatContainer(
@@ -556,8 +539,8 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                 formatContainer.getInputFormats();
         Map<OperatorID, UserCodeWrapper<? extends OutputFormat<?>>> outputFormats =
                 formatContainer.getOutputFormats();
-        Assertions.assertThat(inputFormats.size()).isEqualTo(1);
-        Assertions.assertThat(outputFormats.size()).isEqualTo(2);
+        assertThat(inputFormats).hasSize(1);
+        assertThat(outputFormats).hasSize(2);
 
         Map<String, OperatorID> nameToOperatorIds = new HashMap<>();
         StreamConfig headConfig = new StreamConfig(jobVertex.getConfiguration());
@@ -572,19 +555,19 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
         InputFormat<?, ?> sourceFormat =
                 inputFormats.get(nameToOperatorIds.get("Source: source")).getUserCodeObject();
-        Assertions.assertThat(sourceFormat instanceof TypeSerializerInputFormat).isTrue();
+        assertThat(sourceFormat).isInstanceOf(TypeSerializerInputFormat.class);
 
         OutputFormat<?> sinkFormat1 =
                 outputFormats.get(nameToOperatorIds.get("Sink: sink1")).getUserCodeObject();
-        Assertions.assertThat(sinkFormat1 instanceof DiscardingOutputFormat).isTrue();
+        assertThat(sinkFormat1).isInstanceOf(DiscardingOutputFormat.class);
 
         OutputFormat<?> sinkFormat2 =
                 outputFormats.get(nameToOperatorIds.get("Sink: sink2")).getUserCodeObject();
-        Assertions.assertThat(sinkFormat2 instanceof DiscardingOutputFormat).isTrue();
+        assertThat(sinkFormat2).isInstanceOf(DiscardingOutputFormat.class);
     }
 
     @Test
-    public void testCoordinatedOperator() {
+    void testCoordinatedOperator() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         DataStream<Integer> source =
                 env.fromSource(
@@ -596,26 +579,26 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         StreamGraph streamGraph = env.getStreamGraph();
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
         // There should be only one job vertex.
-        Assertions.assertThat(jobGraph.getNumberOfVertices()).isEqualTo(1);
+        assertThat(jobGraph.getNumberOfVertices()).isEqualTo(1);
 
         JobVertex jobVertex = jobGraph.getVerticesAsArray()[0];
         List<SerializedValue<OperatorCoordinator.Provider>> coordinatorProviders =
                 jobVertex.getOperatorCoordinators();
         // There should be only one coordinator provider.
-        Assertions.assertThat(coordinatorProviders.size()).isEqualTo(1);
+        assertThat(coordinatorProviders).hasSize(1);
         // The invokable class should be SourceOperatorStreamTask.
         final ClassLoader classLoader = getClass().getClassLoader();
-        Assertions.assertThat(jobVertex.getInvokableClass(classLoader))
+        assertThat(jobVertex.getInvokableClass(classLoader))
                 .isEqualTo(SourceOperatorStreamTask.class);
         StreamOperatorFactory operatorFactory =
                 new StreamConfig(jobVertex.getConfiguration())
                         .getStreamOperatorFactory(classLoader);
-        Assertions.assertThat(operatorFactory instanceof SourceOperatorFactory).isTrue();
+        assertThat(operatorFactory).isInstanceOf(SourceOperatorFactory.class);
     }
 
     /** Test setting exchange mode to {@link StreamExchangeMode#PIPELINED}. */
     @Test
-    public void testExchangeModePipelined() {
+    void testExchangeModePipelined() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         // fromElements -> Map -> Print
         DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
@@ -642,19 +625,19 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
 
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(verticesSorted.size()).isEqualTo(2);
+        assertThat(verticesSorted).hasSize(2);
 
         // it can be chained with PIPELINED exchange mode
         JobVertex sourceAndMapVertex = verticesSorted.get(0);
 
         // PIPELINED exchange mode is translated into PIPELINED_BOUNDED result partition
-        Assertions.assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
+        assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
                 .isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
     }
 
     /** Test setting exchange mode to {@link StreamExchangeMode#BATCH}. */
     @Test
-    public void testExchangeModeBatch() {
+    void testExchangeModeBatch() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         env.setBufferTimeout(-1);
@@ -683,22 +666,22 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
 
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(verticesSorted.size()).isEqualTo(3);
+        assertThat(verticesSorted).hasSize(3);
 
         // it can not be chained with BATCH exchange mode
         JobVertex sourceVertex = verticesSorted.get(0);
         JobVertex mapVertex = verticesSorted.get(1);
 
         // BATCH exchange mode is translated into BLOCKING result partition
-        Assertions.assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
+        assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
                 .isEqualTo(ResultPartitionType.BLOCKING);
-        Assertions.assertThat(mapVertex.getProducedDataSets().get(0).getResultType())
+        assertThat(mapVertex.getProducedDataSets().get(0).getResultType())
                 .isEqualTo(ResultPartitionType.BLOCKING);
     }
 
     /** Test setting exchange mode to {@link StreamExchangeMode#UNDEFINED}. */
     @Test
-    public void testExchangeModeUndefined() {
+    void testExchangeModeUndefined() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         // fromElements -> Map -> Print
         DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
@@ -725,13 +708,13 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
 
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(verticesSorted.size()).isEqualTo(2);
+        assertThat(verticesSorted).hasSize(2);
 
         // it can be chained with UNDEFINED exchange mode
         JobVertex sourceAndMapVertex = verticesSorted.get(0);
 
         // UNDEFINED exchange mode is translated into PIPELINED_BOUNDED result partition by default
-        Assertions.assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
+        assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
                 .isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
     }
 
@@ -764,13 +747,13 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
 
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(verticesSorted.size()).isEqualTo(2);
+        assertThat(verticesSorted).hasSize(2);
 
         // it can be chained with HYBRID_FULL exchange mode
         JobVertex sourceAndMapVertex = verticesSorted.get(0);
 
         // HYBRID_FULL exchange mode is translated into HYBRID_FULL result partition
-        Assertions.assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
+        assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
                 .isEqualTo(ResultPartitionType.HYBRID_FULL);
     }
 
@@ -803,35 +786,35 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
 
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(verticesSorted.size()).isEqualTo(2);
+        assertThat(verticesSorted).hasSize(2);
 
         // it can be chained with HYBRID_SELECTIVE exchange mode
         JobVertex sourceAndMapVertex = verticesSorted.get(0);
 
         // HYBRID_SELECTIVE exchange mode is translated into HYBRID_SELECTIVE result partition
-        Assertions.assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
+        assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
                 .isEqualTo(ResultPartitionType.HYBRID_SELECTIVE);
     }
 
     @Test
-    public void testStreamingJobTypeByDefault() {
+    void testStreamingJobTypeByDefault() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.fromElements("test").addSink(new DiscardingSink<>());
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
-        Assertions.assertThat(jobGraph.getJobType()).isEqualTo(JobType.STREAMING);
+        assertThat(jobGraph.getJobType()).isEqualTo(JobType.STREAMING);
     }
 
     @Test
-    public void testBatchJobType() {
+    void testBatchJobType() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         env.fromElements("test").addSink(new DiscardingSink<>());
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
-        Assertions.assertThat(jobGraph.getJobType()).isEqualTo(JobType.BATCH);
+        assertThat(jobGraph.getJobType()).isEqualTo(JobType.BATCH);
     }
 
     @Test
-    public void testPartitionTypesInBatchMode() {
+    void testPartitionTypesInBatchMode() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         env.setParallelism(4);
@@ -851,35 +834,25 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
         List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
-        assertThat(
-                verticesSorted.get(0) /* source - forward */,
-                hasOutputPartitionType(ResultPartitionType.BLOCKING));
-        assertThat(
-                verticesSorted.get(1) /* rescale */,
-                hasOutputPartitionType(ResultPartitionType.BLOCKING));
-        assertThat(
-                verticesSorted.get(2) /* rebalance */,
-                hasOutputPartitionType(ResultPartitionType.BLOCKING));
-        assertThat(
-                verticesSorted.get(3) /* keyBy */,
-                hasOutputPartitionType(ResultPartitionType.BLOCKING));
-        assertThat(
-                verticesSorted.get(4) /* forward - sink */,
-                hasOutputPartitionType(ResultPartitionType.BLOCKING));
+        assertHasOutputPartitionType(
+                verticesSorted.get(0) /* source - forward */, ResultPartitionType.BLOCKING);
+        assertHasOutputPartitionType(
+                verticesSorted.get(1) /* rescale */, ResultPartitionType.BLOCKING);
+        assertHasOutputPartitionType(
+                verticesSorted.get(2) /* rebalance */, ResultPartitionType.BLOCKING);
+        assertHasOutputPartitionType(
+                verticesSorted.get(3) /* keyBy */, ResultPartitionType.BLOCKING);
+        assertHasOutputPartitionType(
+                verticesSorted.get(4) /* forward - sink */, ResultPartitionType.BLOCKING);
     }
 
-    private Matcher<JobVertex> hasOutputPartitionType(ResultPartitionType partitionType) {
-        return new FeatureMatcher<JobVertex, ResultPartitionType>(
-                equalTo(partitionType), "output partition type", "output partition type") {
-            @Override
-            protected ResultPartitionType featureValueOf(JobVertex actual) {
-                return actual.getProducedDataSets().get(0).getResultType();
-            }
-        };
+    private void assertHasOutputPartitionType(
+            JobVertex jobVertex, ResultPartitionType partitionType) {
+        assertThat(jobVertex.getProducedDataSets().get(0).getResultType()).isEqualTo(partitionType);
     }
 
     @Test
-    public void testNormalExchangeModeWithBufferTimeout() {
+    void testNormalExchangeModeWithBufferTimeout() {
         testCompatibleExchangeModeWithBufferTimeout(StreamExchangeMode.PIPELINED);
     }
 
@@ -901,7 +874,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     }
 
     @Test
-    public void testDisablingBufferTimeoutWithPipelinedExchanges() {
+    void testDisablingBufferTimeoutWithPipelinedExchanges() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
         env.setBufferTimeout(-1);
@@ -913,14 +886,14 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
             final StreamConfig streamConfig = new StreamConfig(vertex.getConfiguration());
             for (NonChainedOutput output :
                     streamConfig.getVertexNonChainedOutputs(this.getClass().getClassLoader())) {
-                assertThat(output.getBufferTimeout(), equalTo(-1L));
+                assertThat(output.getBufferTimeout()).isEqualTo(-1L);
             }
         }
     }
 
     /** Test iteration job, check slot sharing group and co-location group. */
     @Test
-    public void testIteration() {
+    void testIteration() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
         DataStream<Integer> source = env.fromElements(1, 2, 3).name("source");
@@ -933,45 +906,37 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
 
         SlotSharingGroup slotSharingGroup = jobGraph.getVerticesAsArray()[0].getSlotSharingGroup();
-        Assertions.assertThat(slotSharingGroup).isNotNull();
+        assertThat(slotSharingGroup).isNotNull();
 
         CoLocationGroup iterationSourceCoLocationGroup = null;
         CoLocationGroup iterationSinkCoLocationGroup = null;
 
         for (JobVertex jobVertex : jobGraph.getVertices()) {
             // all vertices have same slot sharing group by default
-            Assertions.assertThat(jobVertex.getSlotSharingGroup()).isEqualTo(slotSharingGroup);
+            assertThat(jobVertex.getSlotSharingGroup()).isEqualTo(slotSharingGroup);
 
             // all iteration vertices have same co-location group,
             // others have no co-location group by default
             if (jobVertex.getName().startsWith(StreamGraph.ITERATION_SOURCE_NAME_PREFIX)) {
                 iterationSourceCoLocationGroup = jobVertex.getCoLocationGroup();
-                Assertions.assertThat(
-                                iterationSourceCoLocationGroup
-                                        .getVertexIds()
-                                        .contains(jobVertex.getID()))
-                        .isTrue();
+                assertThat(iterationSourceCoLocationGroup.getVertexIds())
+                        .contains(jobVertex.getID());
             } else if (jobVertex.getName().startsWith(StreamGraph.ITERATION_SINK_NAME_PREFIX)) {
                 iterationSinkCoLocationGroup = jobVertex.getCoLocationGroup();
-                Assertions.assertThat(
-                                iterationSinkCoLocationGroup
-                                        .getVertexIds()
-                                        .contains(jobVertex.getID()))
-                        .isTrue();
+                assertThat(iterationSinkCoLocationGroup.getVertexIds()).contains(jobVertex.getID());
             } else {
-                Assertions.assertThat(jobVertex.getCoLocationGroup()).isNull();
+                assertThat(jobVertex.getCoLocationGroup()).isNull();
             }
         }
 
-        Assertions.assertThat(iterationSourceCoLocationGroup).isNotNull();
-        Assertions.assertThat(iterationSinkCoLocationGroup).isNotNull();
-        Assertions.assertThat(iterationSinkCoLocationGroup)
-                .isEqualTo(iterationSourceCoLocationGroup);
+        assertThat(iterationSourceCoLocationGroup).isNotNull();
+        assertThat(iterationSinkCoLocationGroup).isNotNull();
+        assertThat(iterationSinkCoLocationGroup).isEqualTo(iterationSourceCoLocationGroup);
     }
 
     /** Test default job type. */
     @Test
-    public void testDefaultJobType() {
+    void testDefaultJobType() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
         StreamGraph streamGraph =
@@ -979,11 +944,11 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                                 Collections.emptyList(), env.getConfig(), env.getCheckpointConfig())
                         .generate();
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
-        Assertions.assertThat(jobGraph.getJobType()).isEqualTo(JobType.STREAMING);
+        assertThat(jobGraph.getJobType()).isEqualTo(JobType.STREAMING);
     }
 
     @Test
-    public void testYieldingOperatorNotChainableToTaskChainedToLegacySource() {
+    void testYieldingOperatorNotChainableToTaskChainedToLegacySource() {
         StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1);
 
         chainEnv.fromElements(1)
@@ -998,16 +963,14 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                 streamGraph.getStreamNodes().stream()
                         .sorted(Comparator.comparingInt(StreamNode::getId))
                         .collect(Collectors.toList());
-        Assertions.assertThat(
-                        areOperatorsChainable(streamNodes.get(0), streamNodes.get(1), streamGraph))
+        assertThat(areOperatorsChainable(streamNodes.get(0), streamNodes.get(1), streamGraph))
                 .isTrue();
-        Assertions.assertThat(
-                        areOperatorsChainable(streamNodes.get(1), streamNodes.get(2), streamGraph))
+        assertThat(areOperatorsChainable(streamNodes.get(1), streamNodes.get(2), streamGraph))
                 .isFalse();
     }
 
     @Test
-    public void testYieldingOperatorChainableToTaskNotChainedToLegacySource() {
+    void testYieldingOperatorChainableToTaskNotChainedToLegacySource() {
         StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1);
 
         chainEnv.fromElements(1)
@@ -1022,11 +985,9 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                 streamGraph.getStreamNodes().stream()
                         .sorted(Comparator.comparingInt(StreamNode::getId))
                         .collect(Collectors.toList());
-        Assertions.assertThat(
-                        areOperatorsChainable(streamNodes.get(0), streamNodes.get(1), streamGraph))
+        assertThat(areOperatorsChainable(streamNodes.get(0), streamNodes.get(1), streamGraph))
                 .isFalse();
-        Assertions.assertThat(
-                        areOperatorsChainable(streamNodes.get(1), streamNodes.get(2), streamGraph))
+        assertThat(areOperatorsChainable(streamNodes.get(1), streamNodes.get(2), streamGraph))
                 .isTrue();
     }
 
@@ -1035,7 +996,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
      * chained to legacy sources, see FLINK-16219.
      */
     @Test
-    public void testYieldingOperatorProperlyChainedOnLegacySources() {
+    void testYieldingOperatorProperlyChainedOnLegacySources() {
         StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1);
 
         chainEnv.fromElements(1)
@@ -1050,9 +1011,9 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         final JobGraph jobGraph = chainEnv.getStreamGraph().getJobGraph();
 
         final List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(vertices.size()).isEqualTo(2);
-        Assertions.assertThat(vertices.get(0).getOperatorIDs().size()).isEqualTo(2);
-        Assertions.assertThat(vertices.get(1).getOperatorIDs().size()).isEqualTo(5);
+        assertThat(vertices).hasSize(2);
+        assertThat(vertices.get(0).getOperatorIDs()).hasSize(2);
+        assertThat(vertices.get(1).getOperatorIDs()).hasSize(5);
     }
 
     /**
@@ -1060,7 +1021,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
      * chained to new sources, see FLINK-20444.
      */
     @Test
-    public void testYieldingOperatorProperlyChainedOnNewSources() {
+    void testYieldingOperatorProperlyChainedOnNewSources() {
         StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1);
 
         chainEnv.fromSource(
@@ -1073,12 +1034,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         final JobGraph jobGraph = chainEnv.getStreamGraph().getJobGraph();
 
         final List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(vertices.size()).isEqualTo(1);
-        Assertions.assertThat(vertices.get(0).getOperatorIDs().size()).isEqualTo(4);
+        assertThat(vertices).hasSize(1);
+        assertThat(vertices.get(0).getOperatorIDs()).hasSize(4);
     }
 
     @Test
-    public void testDeterministicUnionOrder() {
+    void testDeterministicUnionOrder() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
         JobGraph jobGraph = getUnionJobGraph(env);
@@ -1092,14 +1053,14 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
             JobGraph jobGraph2 = getUnionJobGraph(env);
             JobVertex jobSink2 =
                     Iterables.getLast(jobGraph2.getVerticesSortedTopologicallyFromSources());
-            Assertions.assertThat(jobSink)
+            assertThat(jobSink)
                     .withFailMessage("Different runs should yield different vertexes")
                     .isNotEqualTo(jobSink2);
             List<String> actualSourceOrder =
                     jobSink2.getInputs().stream()
                             .map(edge -> edge.getSource().getProducer().getName())
                             .collect(Collectors.toList());
-            Assertions.assertThat(actualSourceOrder)
+            assertThat(actualSourceOrder)
                     .withFailMessage("Union inputs reordered")
                     .isEqualTo(expectedSourceOrder);
         }
@@ -1121,7 +1082,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     }
 
     @Test
-    public void testNotSupportInputSelectableOperatorIfCheckpointing() {
+    void testNotSupportInputSelectableOperatorIfCheckpointing() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.enableCheckpointing(60_000L);
 
@@ -1134,13 +1095,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                         new TestAnyModeReadingStreamOperator("test operator"))
                 .print();
 
-        Assertions.assertThatThrownBy(
-                        () -> StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()))
+        assertThatThrownBy(() -> StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()))
                 .isInstanceOf(UnsupportedOperationException.class);
     }
 
     @Test
-    public void testManagedMemoryFractionForUnknownResourceSpec() throws Exception {
+    void testManagedMemoryFractionForUnknownResourceSpec() throws Exception {
         final ResourceSpec resource = ResourceSpec.UNKNOWN;
         final List<ResourceSpec> resourceSpecs =
                 Arrays.asList(resource, resource, resource, resource);
@@ -1295,20 +1255,20 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
             double expectedStateBackendFrac,
             Configuration tmConfig) {
         final double delta = 0.000001;
-        Assertions.assertThat(
+        assertThat(
                         streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(
                                 ManagedMemoryUseCase.STATE_BACKEND,
                                 tmConfig,
                                 ClassLoader.getSystemClassLoader()))
                 .isCloseTo(expectedStateBackendFrac, Offset.offset(delta));
-        Assertions.assertThat(
+        assertThat(
                         streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(
                                 ManagedMemoryUseCase.PYTHON,
                                 tmConfig,
                                 ClassLoader.getSystemClassLoader()))
                 .isCloseTo(expectedPythonFrac, Offset.offset(delta));
 
-        Assertions.assertThat(
+        assertThat(
                         streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(
                                 ManagedMemoryUseCase.OPERATOR,
                                 tmConfig,
@@ -1354,7 +1314,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     }
 
     @Test
-    public void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultEnabled() {
+    void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultEnabled() {
         final StreamGraph streamGraph = createStreamGraphForSlotSharingTest(new Configuration());
         // specify slot sharing group for map1
         streamGraph.getStreamNodes().stream()
@@ -1366,7 +1326,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
 
         final List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(verticesSorted.size()).isEqualTo(4);
+        assertThat(verticesSorted).hasSize(4);
 
         final List<JobVertex> verticesMatched = getExpectedVerticesList(verticesSorted);
         final JobVertex source1Vertex = verticesMatched.get(0);
@@ -1381,13 +1341,13 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     }
 
     @Test
-    public void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled() {
+    void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled() {
         final StreamGraph streamGraph = createStreamGraphForSlotSharingTest(new Configuration());
         streamGraph.setAllVerticesInSameSlotSharingGroupByDefault(false);
         final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
 
         final List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(verticesSorted.size()).isEqualTo(4);
+        assertThat(verticesSorted).hasSize(4);
 
         final List<JobVertex> verticesMatched = getExpectedVerticesList(verticesSorted);
         final JobVertex source1Vertex = verticesMatched.get(0);
@@ -1400,7 +1360,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     }
 
     @Test
-    public void testSlotSharingResourceConfiguration() {
+    void testSlotSharingResourceConfiguration() {
         final String slotSharingGroup1 = "slot-a";
         final String slotSharingGroup2 = "slot-b";
         final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10);
@@ -1431,25 +1391,25 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         for (JobVertex jobVertex : jobGraph.getVertices()) {
             numVertex += 1;
             if (jobVertex.getName().contains(slotSharingGroup1)) {
-                Assertions.assertThat(resourceProfile1)
-                        .isEqualTo(jobVertex.getSlotSharingGroup().getResourceProfile());
+                assertThat(jobVertex.getSlotSharingGroup().getResourceProfile())
+                        .isEqualTo(resourceProfile1);
             } else if (jobVertex.getName().contains(slotSharingGroup2)) {
-                Assertions.assertThat(resourceProfile2)
-                        .isEqualTo(jobVertex.getSlotSharingGroup().getResourceProfile());
+                assertThat(jobVertex.getSlotSharingGroup().getResourceProfile())
+                        .isEqualTo(resourceProfile2);
             } else if (jobVertex
                     .getName()
                     .contains(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)) {
-                Assertions.assertThat(resourceProfile3)
-                        .isEqualTo(jobVertex.getSlotSharingGroup().getResourceProfile());
+                assertThat(jobVertex.getSlotSharingGroup().getResourceProfile())
+                        .isEqualTo(resourceProfile3);
             } else {
                 Assertions.fail("");
             }
         }
-        assertThat(numVertex, is(3));
+        assertThat(numVertex).isEqualTo(3);
     }
 
     @Test
-    public void testSlotSharingResourceConfigurationWithDefaultSlotSharingGroup() {
+    void testSlotSharingResourceConfigurationWithDefaultSlotSharingGroup() {
         final ResourceProfile resourceProfile = ResourceProfile.fromResources(1, 10);
         final Map<String, ResourceProfile> slotSharingGroupResource = new HashMap<>();
         slotSharingGroupResource.put(
@@ -1465,23 +1425,22 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         int numVertex = 0;
         for (JobVertex jobVertex : jobGraph.getVertices()) {
             numVertex += 1;
-            Assertions.assertThat(resourceProfile)
-                    .isEqualTo(jobVertex.getSlotSharingGroup().getResourceProfile());
+            assertThat(jobVertex.getSlotSharingGroup().getResourceProfile())
+                    .isEqualTo(resourceProfile);
         }
-        assertThat(numVertex, is(2));
+        assertThat(numVertex).isEqualTo(2);
     }
 
     @Test
-    public void testNamingOfChainedMultipleInputs() {
+    void testNamingOfChainedMultipleInputs() {
         String[] sources = new String[] {"source-1", "source-2", "source-3"};
         JobGraph graph = createGraphWithMultipleInputs(true, sources);
         JobVertex head = graph.getVerticesSortedTopologicallyFromSources().iterator().next();
-        Assertions.assertThat(sources)
-                .allMatch(source -> head.getOperatorPrettyName().contains(source));
+        assertThat(sources).allMatch(source -> head.getOperatorPrettyName().contains(source));
     }
 
     @Test
-    public void testNamingOfNonChainedMultipleInputs() {
+    void testNamingOfNonChainedMultipleInputs() {
         String[] sources = new String[] {"source-1", "source-2", "source-3"};
         JobGraph graph = createGraphWithMultipleInputs(false, sources);
         JobVertex head =
@@ -1490,12 +1449,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                         vertex ->
                                 vertex.getInvokableClassName()
                                         .equals(MultipleInputStreamTask.class.getName()));
-        Assertions.assertThat(head.getName().contains("source-1"))
-                .withFailMessage(head.getName())
-                .isFalse();
-        Assertions.assertThat(head.getOperatorPrettyName().contains("source-1"))
+        assertThat(head.getName()).withFailMessage(head.getName()).doesNotContain("source-1");
+        assertThat(head.getOperatorPrettyName())
                 .withFailMessage(head.getOperatorPrettyName())
-                .isFalse();
+                .doesNotContain("source-1");
     }
 
     public JobGraph createGraphWithMultipleInputs(boolean chain, String... inputNames) {
@@ -1521,12 +1478,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     }
 
     @Test
-    public void testTreeDescription() {
+    void testTreeDescription() {
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         JobGraph job = createJobGraphWithDescription(env, "test source");
         JobVertex[] allVertices = job.getVerticesAsArray();
-        Assertions.assertThat(allVertices.length).isEqualTo(1);
-        Assertions.assertThat(allVertices[0].getOperatorPrettyName())
+        assertThat(allVertices).hasSize(1);
+        assertThat(allVertices[0].getOperatorPrettyName())
                 .isEqualTo(
                         "test source\n"
                                 + ":- x + 1\n"
@@ -1538,12 +1495,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     }
 
     @Test
-    public void testTreeDescriptionWithChainedSource() {
+    void testTreeDescriptionWithChainedSource() {
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         JobGraph job = createJobGraphWithDescription(env, "test source 1", "test source 2");
         JobVertex[] allVertices = job.getVerticesAsArray();
-        Assertions.assertThat(allVertices.length).isEqualTo(1);
-        Assertions.assertThat(allVertices[0].getOperatorPrettyName())
+        assertThat(allVertices).hasSize(1);
+        assertThat(allVertices[0].getOperatorPrettyName())
                 .isEqualTo(
                         "operator chained with source [test source 1, test source 2]\n"
                                 + ":- x + 1\n"
@@ -1555,7 +1512,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     }
 
     @Test
-    public void testCascadingDescription() {
+    void testCascadingDescription() {
         final Configuration config = new Configuration();
         config.set(
                 PipelineOptions.VERTEX_DESCRIPTION_MODE,
@@ -1564,14 +1521,14 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                 StreamExecutionEnvironment.getExecutionEnvironment(config);
         JobGraph job = createJobGraphWithDescription(env, "test source");
         JobVertex[] allVertices = job.getVerticesAsArray();
-        Assertions.assertThat(allVertices.length).isEqualTo(1);
-        Assertions.assertThat(allVertices[0].getOperatorPrettyName())
+        assertThat(allVertices).hasSize(1);
+        assertThat(allVertices[0].getOperatorPrettyName())
                 .isEqualTo(
                         "test source -> (x + 1 -> (first print of map1 , second print of map1) , x + 2 -> (first print of map2 , second print of map2))");
     }
 
     @Test
-    public void testCascadingDescriptionWithChainedSource() {
+    void testCascadingDescriptionWithChainedSource() {
         final Configuration config = new Configuration();
         config.set(
                 PipelineOptions.VERTEX_DESCRIPTION_MODE,
@@ -1580,38 +1537,38 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                 StreamExecutionEnvironment.getExecutionEnvironment(config);
         JobGraph job = createJobGraphWithDescription(env, "test source 1", "test source 2");
         JobVertex[] allVertices = job.getVerticesAsArray();
-        Assertions.assertThat(allVertices.length).isEqualTo(1);
-        Assertions.assertThat(allVertices[0].getOperatorPrettyName())
+        assertThat(allVertices).hasSize(1);
+        assertThat(allVertices[0].getOperatorPrettyName())
                 .isEqualTo(
                         "operator chained with source [test source 1, test source 2] -> (x + 1 -> (first print of map1 , second print of map1) , x + 2 -> (first print of map2 , second print of map2))");
     }
 
     @Test
-    public void testNamingWithoutIndex() {
+    void testNamingWithoutIndex() {
         JobGraph job = createStreamGraphForSlotSharingTest(new Configuration()).getJobGraph();
         List<JobVertex> allVertices = job.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(allVertices.size()).isEqualTo(4);
-        Assertions.assertThat(allVertices.get(0).getName()).isEqualTo("Source: source1");
-        Assertions.assertThat(allVertices.get(1).getName()).isEqualTo("Source: source2");
-        Assertions.assertThat(allVertices.get(2).getName()).isEqualTo("map1");
-        Assertions.assertThat(allVertices.get(3).getName()).isEqualTo("map2");
+        assertThat(allVertices).hasSize(4);
+        assertThat(allVertices.get(0).getName()).isEqualTo("Source: source1");
+        assertThat(allVertices.get(1).getName()).isEqualTo("Source: source2");
+        assertThat(allVertices.get(2).getName()).isEqualTo("map1");
+        assertThat(allVertices.get(3).getName()).isEqualTo("map2");
     }
 
     @Test
-    public void testNamingWithIndex() {
+    void testNamingWithIndex() {
         Configuration config = new Configuration();
         config.setBoolean(PipelineOptions.VERTEX_NAME_INCLUDE_INDEX_PREFIX, true);
         JobGraph job = createStreamGraphForSlotSharingTest(config).getJobGraph();
         List<JobVertex> allVertices = job.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(allVertices.size()).isEqualTo(4);
-        Assertions.assertThat(allVertices.get(0).getName()).isEqualTo("[vertex-0]Source: source1");
-        Assertions.assertThat(allVertices.get(1).getName()).isEqualTo("[vertex-1]Source: source2");
-        Assertions.assertThat(allVertices.get(2).getName()).isEqualTo("[vertex-2]map1");
-        Assertions.assertThat(allVertices.get(3).getName()).isEqualTo("[vertex-3]map2");
+        assertThat(allVertices).hasSize(4);
+        assertThat(allVertices.get(0).getName()).isEqualTo("[vertex-0]Source: source1");
+        assertThat(allVertices.get(1).getName()).isEqualTo("[vertex-1]Source: source2");
+        assertThat(allVertices.get(2).getName()).isEqualTo("[vertex-2]map1");
+        assertThat(allVertices.get(3).getName()).isEqualTo("[vertex-3]map2");
     }
 
     @Test
-    public void testCacheJobGraph() throws Throwable {
+    void testCacheJobGraph() throws Throwable {
         final TestingStreamExecutionEnvironment env = new TestingStreamExecutionEnvironment();
         env.setParallelism(2);
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -1619,8 +1576,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         DataStream<Integer> source = env.fromElements(1, 2, 3).name("source");
         CachedDataStream<Integer> cachedStream =
                 source.map(i -> i + 1).name("map-1").map(i -> i + 1).name("map-2").cache();
-        Assertions.assertThat(cachedStream.getTransformation())
-                .isInstanceOf(CacheTransformation.class);
+        assertThat(cachedStream.getTransformation()).isInstanceOf(CacheTransformation.class);
         CacheTransformation<Integer> cacheTransformation =
                 (CacheTransformation<Integer>) cachedStream.getTransformation();
 
@@ -1628,7 +1584,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
         JobGraph jobGraph = env.getStreamGraph().getJobGraph();
         List<JobVertex> allVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(allVertices.size()).isEqualTo(3);
+        assertThat(allVertices).hasSize(3);
 
         final JobVertex cacheWriteVertex =
                 allVertices.stream()
@@ -1641,13 +1597,13 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                                                         "CacheWrite job vertex not found"));
 
         final List<JobEdge> inputs = cacheWriteVertex.getInputs();
-        Assertions.assertThat(inputs.size()).isEqualTo(1);
-        Assertions.assertThat(inputs.get(0).getDistributionPattern()).isEqualTo(POINTWISE);
-        Assertions.assertThat(inputs.get(0).getSource().getResultType())
+        assertThat(inputs).hasSize(1);
+        assertThat(inputs.get(0).getDistributionPattern()).isEqualTo(POINTWISE);
+        assertThat(inputs.get(0).getSource().getResultType())
                 .isEqualTo(ResultPartitionType.BLOCKING_PERSISTENT);
-        Assertions.assertThat(new AbstractID(inputs.get(0).getSourceId()))
+        assertThat(new AbstractID(inputs.get(0).getSourceId()))
                 .isEqualTo(cacheTransformation.getDatasetId());
-        Assertions.assertThat(inputs.get(0).getSource().getProducer().getName())
+        assertThat(inputs.get(0).getSource().getProducer().getName())
                 .isEqualTo("map-1 -> map-2 -> Sink: print");
 
         env.addCompletedClusterDataset(cacheTransformation.getDatasetId());
@@ -1655,13 +1611,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
         jobGraph = env.getStreamGraph().getJobGraph();
         allVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(allVertices.size()).isEqualTo(1);
-        Assertions.assertThat(allVertices.get(0).getName()).isEqualTo("CacheRead -> Sink: print");
-        Assertions.assertThat(allVertices.get(0).getIntermediateDataSetIdsToConsume().size())
-                .isEqualTo(1);
-        Assertions.assertThat(
-                        new AbstractID(
-                                allVertices.get(0).getIntermediateDataSetIdsToConsume().get(0)))
+        assertThat(allVertices).hasSize(1);
+        assertThat(allVertices.get(0).getName()).isEqualTo("CacheRead -> Sink: print");
+        assertThat(allVertices.get(0).getIntermediateDataSetIdsToConsume()).hasSize(1);
+        assertThat(new AbstractID(allVertices.get(0).getIntermediateDataSetIdsToConsume().get(0)))
                 .isEqualTo(cacheTransformation.getDatasetId());
     }
 
@@ -1670,7 +1623,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
      * dataset if they have the same parallelism and partitioner.
      */
     @Test
-    public void testIntermediateDataSetReuse() {
+    void testIntermediateDataSetReuse() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setBufferTimeout(-1);
         DataStream<Integer> source = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
@@ -1700,25 +1653,25 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
         JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
 
         List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
-        Assertions.assertThat(vertices.size()).isEqualTo(9);
+        assertThat(vertices).hasSize(9);
 
         JobVertex sourceVertex = vertices.get(0);
         List<IntermediateDataSetID> producedDataSet =
                 sourceVertex.getProducedDataSets().stream()
                         .map(IntermediateDataSet::getId)
                         .collect(Collectors.toList());
-        Assertions.assertThat(producedDataSet.size()).isEqualTo(6);
+        assertThat(producedDataSet).hasSize(6);
 
         JobVertex sinkVertex1 = checkNotNull(findJobVertexWithName(vertices, "sink1"));
         JobVertex sinkVertex2 = checkNotNull(findJobVertexWithName(vertices, "sink2"));
         JobVertex sinkVertex3 = checkNotNull(findJobVertexWithName(vertices, "sink3"));
         JobVertex sinkVertex4 = checkNotNull(findJobVertexWithName(vertices, "sink4"));
 
-        Assertions.assertThat(sinkVertex2.getInputs().get(0).getSource().getId())
+        assertThat(sinkVertex2.getInputs().get(0).getSource().getId())
                 .isEqualTo(sinkVertex1.getInputs().get(0).getSource().getId());
-        Assertions.assertThat(sinkVertex4.getInputs().get(0).getSource().getId())
+        assertThat(sinkVertex4.getInputs().get(0).getSource().getId())
                 .isEqualTo(sinkVertex3.getInputs().get(0).getSource().getId());
-        Assertions.assertThat(sinkVertex3.getInputs().get(0).getSource().getId())
+        assertThat(sinkVertex3.getInputs().get(0).getSource().getId())
                 .isNotEqualTo(sinkVertex1.getInputs().get(0).getSource().getId());
 
         StreamConfig streamConfig = new StreamConfig(sourceVertex.getConfiguration());
@@ -1726,18 +1679,16 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
                 streamConfig.getOperatorNonChainedOutputs(getClass().getClassLoader()).stream()
                         .map(NonChainedOutput::getDataSetId)
                         .collect(Collectors.toList());
-        Assertions.assertThat(nonChainedOutputs.size()).isEqualTo(5);
-        Assertions.assertThat(
-                        nonChainedOutputs.contains(
-                                sinkVertex3.getInputs().get(0).getSource().getId()))
-                .isFalse();
+        assertThat(nonChainedOutputs).hasSize(5);
+        assertThat(nonChainedOutputs)
+                .doesNotContain(sinkVertex3.getInputs().get(0).getSource().getId());
 
         List<IntermediateDataSetID> streamOutputsInOrder =
                 streamConfig.getVertexNonChainedOutputs(getClass().getClassLoader()).stream()
                         .map(NonChainedOutput::getDataSetId)
                         .collect(Collectors.toList());
-        Assertions.assertThat(streamOutputsInOrder.size()).isEqualTo(6);
-        Assertions.assertThat(streamOutputsInOrder.toArray()).isEqualTo(producedDataSet.toArray());
+        assertThat(streamOutputsInOrder).hasSize(6);
+        assertThat(streamOutputsInOrder).isEqualTo(producedDataSet);
     }
 
     private static JobVertex findJobVertexWithName(List<JobVertex> vertices, String name) {
@@ -1841,7 +1792,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 
     private void assertSameSlotSharingGroup(JobVertex... vertices) {
         for (int i = 0; i < vertices.length - 1; i++) {
-            Assertions.assertThat(vertices[i + 1].getSlotSharingGroup())
+            assertThat(vertices[i + 1].getSlotSharingGroup())
                     .isEqualTo(vertices[i].getSlotSharingGroup());
         }
     }
@@ -1849,7 +1800,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
     private void assertDistinctSharingGroups(JobVertex... vertices) {
         for (int i = 0; i < vertices.length - 1; i++) {
             for (int j = i + 1; j < vertices.length; j++) {
-                Assertions.assertThat(vertices[i].getSlotSharingGroup())
+                assertThat(vertices[i].getSlotSharingGroup())
                         .isNotEqualTo(vertices[j].getSlotSharingGroup());
             }
         }