You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/09/28 09:23:24 UTC
[flink] 01/02: [hotfix] Migrate StreamingJobGraphGeneratorTest all tests to assertj and make testing method package private
This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 32b95e7715c2eda0ff9523a593c798ba3b47c397
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Sep 28 11:47:41 2022 +0800
[hotfix] Migrate StreamingJobGraphGeneratorTest all tests to assertj and make testing method package private
---
.../api/graph/StreamingJobGraphGeneratorTest.java | 449 +++++++++------------
1 file changed, 200 insertions(+), 249 deletions(-)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 7da1c0e3697..dc534008158 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -100,15 +100,14 @@ import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;
import org.apache.flink.util.SerializedValue;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.Offset;
-import org.hamcrest.FeatureMatcher;
-import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -126,17 +125,16 @@ import java.util.stream.Collectors;
import static org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE;
import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.areOperatorsChainable;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
/** Tests for {@link StreamingJobGraphGenerator}. */
+@ExtendWith(TestLoggerExtension.class)
@SuppressWarnings("serial")
-public class StreamingJobGraphGeneratorTest extends TestLogger {
+class StreamingJobGraphGeneratorTest {
@Test
- public void testParallelismOneNotChained() {
+ void testParallelismOneNotChained() {
// --------- the program ---------
@@ -179,16 +177,16 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
JobGraph jobGraph = streamGraph.getJobGraph();
List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
- Assertions.assertThat(jobGraph.getNumberOfVertices()).isEqualTo(2);
- Assertions.assertThat(verticesSorted.get(0).getParallelism()).isEqualTo(1);
- Assertions.assertThat(verticesSorted.get(1).getParallelism()).isEqualTo(1);
+ assertThat(jobGraph.getNumberOfVertices()).isEqualTo(2);
+ assertThat(verticesSorted.get(0).getParallelism()).isEqualTo(1);
+ assertThat(verticesSorted.get(1).getParallelism()).isEqualTo(1);
JobVertex sourceVertex = verticesSorted.get(0);
JobVertex mapSinkVertex = verticesSorted.get(1);
- Assertions.assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
+ assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
.isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
- Assertions.assertThat(mapSinkVertex.getInputs().get(0).getSource().getResultType())
+ assertThat(mapSinkVertex.getInputs().get(0).getSource().getResultType())
.isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
}
@@ -197,40 +195,36 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
* checkpoint mode to {@link CheckpointingMode#AT_LEAST_ONCE}.
*/
@Test
- public void testDisabledCheckpointing() throws Exception {
+ void testDisabledCheckpointing() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(0).print();
StreamGraph streamGraph = env.getStreamGraph();
- Assertions.assertThat(streamGraph.getCheckpointConfig().isCheckpointingEnabled())
+ assertThat(streamGraph.getCheckpointConfig().isCheckpointingEnabled())
.withFailMessage("Checkpointing enabled")
.isFalse();
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
JobCheckpointingSettings snapshottingSettings = jobGraph.getCheckpointingSettings();
- Assertions.assertThat(
+ assertThat(
snapshottingSettings
.getCheckpointCoordinatorConfiguration()
.getCheckpointInterval())
.isEqualTo(Long.MAX_VALUE);
- Assertions.assertThat(
- snapshottingSettings
- .getCheckpointCoordinatorConfiguration()
- .isExactlyOnce())
+ assertThat(snapshottingSettings.getCheckpointCoordinatorConfiguration().isExactlyOnce())
.isFalse();
List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration());
- Assertions.assertThat(streamConfig.getCheckpointMode())
- .isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
+ assertThat(streamConfig.getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
}
@Test
- public void testEnabledUnalignedCheckAndDisabledCheckpointing() {
+ void testEnabledUnalignedCheckAndDisabledCheckpointing() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(0).print();
StreamGraph streamGraph = env.getStreamGraph();
- Assertions.assertThat(streamGraph.getCheckpointConfig().isCheckpointingEnabled())
+ assertThat(streamGraph.getCheckpointConfig().isCheckpointingEnabled())
.withFailMessage("Checkpointing enabled")
.isFalse();
env.getCheckpointConfig().enableUnalignedCheckpoints(true);
@@ -239,13 +233,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration());
- Assertions.assertThat(streamConfig.getCheckpointMode())
- .isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
- Assertions.assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
+ assertThat(streamConfig.getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
+ assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
}
@Test
- public void testUnalignedCheckAndAtLeastOnce() {
+ void testUnalignedCheckAndAtLeastOnce() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(0).print();
StreamGraph streamGraph = env.getStreamGraph();
@@ -256,13 +249,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
StreamConfig streamConfig = new StreamConfig(verticesSorted.get(0).getConfiguration());
- Assertions.assertThat(streamConfig.getCheckpointMode())
- .isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
- Assertions.assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
+ assertThat(streamConfig.getCheckpointMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE);
+ assertThat(streamConfig.isUnalignedCheckpointsEnabled()).isFalse();
}
@Test
- public void generatorForwardsSavepointRestoreSettings() {
+ void generatorForwardsSavepointRestoreSettings() {
StreamGraph streamGraph =
new StreamGraph(
new ExecutionConfig(),
@@ -272,12 +264,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
SavepointRestoreSettings savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
- assertThat(savepointRestoreSettings.getRestorePath(), is("hello"));
+ assertThat(savepointRestoreSettings.getRestorePath()).isEqualTo("hello");
}
/** Verifies that the chain start/end is correctly set. */
@Test
- public void testChainStartEndSetting() throws Exception {
+ void testChainStartEndSetting() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// set parallelism to 2 to avoid chaining with source in case when available processors is
@@ -300,9 +292,9 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
JobVertex sourceVertex = verticesSorted.get(0);
JobVertex mapPrintVertex = verticesSorted.get(1);
- Assertions.assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
+ assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
.isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
- Assertions.assertThat(mapPrintVertex.getInputs().get(0).getSource().getResultType())
+ assertThat(mapPrintVertex.getInputs().get(0).getSource().getResultType())
.isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration());
@@ -311,18 +303,18 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
mapConfig.getTransitiveChainedTaskConfigs(getClass().getClassLoader());
StreamConfig printConfig = chainedConfigs.values().iterator().next();
- Assertions.assertThat(sourceConfig.isChainStart()).isTrue();
- Assertions.assertThat(sourceConfig.isChainEnd()).isTrue();
+ assertThat(sourceConfig.isChainStart()).isTrue();
+ assertThat(sourceConfig.isChainEnd()).isTrue();
- Assertions.assertThat(mapConfig.isChainStart()).isTrue();
- Assertions.assertThat(mapConfig.isChainEnd()).isFalse();
+ assertThat(mapConfig.isChainStart()).isTrue();
+ assertThat(mapConfig.isChainEnd()).isFalse();
- Assertions.assertThat(printConfig.isChainStart()).isFalse();
- Assertions.assertThat(printConfig.isChainEnd()).isTrue();
+ assertThat(printConfig.isChainStart()).isFalse();
+ assertThat(printConfig.isChainEnd()).isTrue();
}
@Test
- public void testOperatorCoordinatorAddedToJobVertex() {
+ void testOperatorCoordinatorAddedToJobVertex() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> stream =
env.fromSource(
@@ -342,8 +334,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
- Assertions.assertThat(jobGraph.getVerticesAsArray()[0].getOperatorCoordinators().size())
- .isEqualTo(2);
+ assertThat(jobGraph.getVerticesAsArray()[0].getOperatorCoordinators()).hasSize(2);
}
/**
@@ -351,7 +342,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
* sink cases) when generating job graph.
*/
@Test
- public void testResourcesForChainedSourceSink() throws Exception {
+ void testResourcesForChainedSourceSink() throws Exception {
ResourceSpec resource1 = ResourceSpec.newBuilder(0.1, 100).build();
ResourceSpec resource2 = ResourceSpec.newBuilder(0.2, 200).build();
ResourceSpec resource3 = ResourceSpec.newBuilder(0.3, 300).build();
@@ -425,14 +416,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
JobVertex reduceSinkVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
- Assertions.assertThat(
- sourceMapFilterVertex
- .getMinResources()
- .equals(resource3.merge(resource2).merge(resource1)))
- .isTrue();
- Assertions.assertThat(
- reduceSinkVertex.getPreferredResources().equals(resource4.merge(resource5)))
- .isTrue();
+ assertThat(sourceMapFilterVertex.getMinResources())
+ .isEqualTo(resource3.merge(resource2).merge(resource1));
+
+ assertThat(reduceSinkVertex.getPreferredResources()).isEqualTo(resource4.merge(resource5));
}
/**
@@ -440,7 +427,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
* chaining and iteration cases) when generating job graph.
*/
@Test
- public void testResourcesForIteration() throws Exception {
+ void testResourcesForIteration() throws Exception {
ResourceSpec resource1 = ResourceSpec.newBuilder(0.1, 100).build();
ResourceSpec resource2 = ResourceSpec.newBuilder(0.2, 200).build();
ResourceSpec resource3 = ResourceSpec.newBuilder(0.3, 300).build();
@@ -508,25 +495,21 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
for (JobVertex jobVertex : jobGraph.getVertices()) {
if (jobVertex.getName().contains("test_source")) {
- Assertions.assertThat(jobVertex.getMinResources().equals(resource1)).isTrue();
+ assertThat(jobVertex.getMinResources()).isEqualTo(resource1);
} else if (jobVertex.getName().contains("Iteration_Source")) {
- Assertions.assertThat(jobVertex.getPreferredResources().equals(resource2)).isTrue();
+ assertThat(jobVertex.getPreferredResources()).isEqualTo(resource2);
} else if (jobVertex.getName().contains("test_flatMap")) {
- Assertions.assertThat(
- jobVertex.getMinResources().equals(resource3.merge(resource4)))
- .isTrue();
+ assertThat(jobVertex.getMinResources()).isEqualTo(resource3.merge(resource4));
} else if (jobVertex.getName().contains("Iteration_Tail")) {
- Assertions.assertThat(
- jobVertex.getPreferredResources().equals(ResourceSpec.DEFAULT))
- .isTrue();
+ assertThat(jobVertex.getPreferredResources()).isEqualTo(ResourceSpec.DEFAULT);
} else if (jobVertex.getName().contains("test_sink")) {
- Assertions.assertThat(jobVertex.getMinResources().equals(resource5)).isTrue();
+ assertThat(jobVertex.getMinResources()).isEqualTo(resource5);
}
}
}
@Test
- public void testInputOutputFormat() {
+ void testInputOutputFormat() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Long> source =
@@ -543,10 +526,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
StreamGraph streamGraph = env.getStreamGraph();
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
- Assertions.assertThat(jobGraph.getNumberOfVertices()).isEqualTo(1);
+ assertThat(jobGraph.getNumberOfVertices()).isEqualTo(1);
JobVertex jobVertex = jobGraph.getVertices().iterator().next();
- Assertions.assertThat(jobVertex instanceof InputOutputFormatVertex).isTrue();
+ assertThat(jobVertex).isInstanceOf(InputOutputFormatVertex.class);
InputOutputFormatContainer formatContainer =
new InputOutputFormatContainer(
@@ -556,8 +539,8 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
formatContainer.getInputFormats();
Map<OperatorID, UserCodeWrapper<? extends OutputFormat<?>>> outputFormats =
formatContainer.getOutputFormats();
- Assertions.assertThat(inputFormats.size()).isEqualTo(1);
- Assertions.assertThat(outputFormats.size()).isEqualTo(2);
+ assertThat(inputFormats).hasSize(1);
+ assertThat(outputFormats).hasSize(2);
Map<String, OperatorID> nameToOperatorIds = new HashMap<>();
StreamConfig headConfig = new StreamConfig(jobVertex.getConfiguration());
@@ -572,19 +555,19 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
InputFormat<?, ?> sourceFormat =
inputFormats.get(nameToOperatorIds.get("Source: source")).getUserCodeObject();
- Assertions.assertThat(sourceFormat instanceof TypeSerializerInputFormat).isTrue();
+ assertThat(sourceFormat).isInstanceOf(TypeSerializerInputFormat.class);
OutputFormat<?> sinkFormat1 =
outputFormats.get(nameToOperatorIds.get("Sink: sink1")).getUserCodeObject();
- Assertions.assertThat(sinkFormat1 instanceof DiscardingOutputFormat).isTrue();
+ assertThat(sinkFormat1).isInstanceOf(DiscardingOutputFormat.class);
OutputFormat<?> sinkFormat2 =
outputFormats.get(nameToOperatorIds.get("Sink: sink2")).getUserCodeObject();
- Assertions.assertThat(sinkFormat2 instanceof DiscardingOutputFormat).isTrue();
+ assertThat(sinkFormat2).isInstanceOf(DiscardingOutputFormat.class);
}
@Test
- public void testCoordinatedOperator() {
+ void testCoordinatedOperator() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> source =
env.fromSource(
@@ -596,26 +579,26 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
StreamGraph streamGraph = env.getStreamGraph();
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
// There should be only one job vertex.
- Assertions.assertThat(jobGraph.getNumberOfVertices()).isEqualTo(1);
+ assertThat(jobGraph.getNumberOfVertices()).isEqualTo(1);
JobVertex jobVertex = jobGraph.getVerticesAsArray()[0];
List<SerializedValue<OperatorCoordinator.Provider>> coordinatorProviders =
jobVertex.getOperatorCoordinators();
// There should be only one coordinator provider.
- Assertions.assertThat(coordinatorProviders.size()).isEqualTo(1);
+ assertThat(coordinatorProviders).hasSize(1);
// The invokable class should be SourceOperatorStreamTask.
final ClassLoader classLoader = getClass().getClassLoader();
- Assertions.assertThat(jobVertex.getInvokableClass(classLoader))
+ assertThat(jobVertex.getInvokableClass(classLoader))
.isEqualTo(SourceOperatorStreamTask.class);
StreamOperatorFactory operatorFactory =
new StreamConfig(jobVertex.getConfiguration())
.getStreamOperatorFactory(classLoader);
- Assertions.assertThat(operatorFactory instanceof SourceOperatorFactory).isTrue();
+ assertThat(operatorFactory).isInstanceOf(SourceOperatorFactory.class);
}
/** Test setting exchange mode to {@link StreamExchangeMode#PIPELINED}. */
@Test
- public void testExchangeModePipelined() {
+ void testExchangeModePipelined() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// fromElements -> Map -> Print
DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
@@ -642,19 +625,19 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
- Assertions.assertThat(verticesSorted.size()).isEqualTo(2);
+ assertThat(verticesSorted).hasSize(2);
// it can be chained with PIPELINED exchange mode
JobVertex sourceAndMapVertex = verticesSorted.get(0);
// PIPELINED exchange mode is translated into PIPELINED_BOUNDED result partition
- Assertions.assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
+ assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
.isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
}
/** Test setting exchange mode to {@link StreamExchangeMode#BATCH}. */
@Test
- public void testExchangeModeBatch() {
+ void testExchangeModeBatch() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setBufferTimeout(-1);
@@ -683,22 +666,22 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
- Assertions.assertThat(verticesSorted.size()).isEqualTo(3);
+ assertThat(verticesSorted).hasSize(3);
// it can not be chained with BATCH exchange mode
JobVertex sourceVertex = verticesSorted.get(0);
JobVertex mapVertex = verticesSorted.get(1);
// BATCH exchange mode is translated into BLOCKING result partition
- Assertions.assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
+ assertThat(sourceVertex.getProducedDataSets().get(0).getResultType())
.isEqualTo(ResultPartitionType.BLOCKING);
- Assertions.assertThat(mapVertex.getProducedDataSets().get(0).getResultType())
+ assertThat(mapVertex.getProducedDataSets().get(0).getResultType())
.isEqualTo(ResultPartitionType.BLOCKING);
}
/** Test setting exchange mode to {@link StreamExchangeMode#UNDEFINED}. */
@Test
- public void testExchangeModeUndefined() {
+ void testExchangeModeUndefined() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// fromElements -> Map -> Print
DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3);
@@ -725,13 +708,13 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
- Assertions.assertThat(verticesSorted.size()).isEqualTo(2);
+ assertThat(verticesSorted).hasSize(2);
// it can be chained with UNDEFINED exchange mode
JobVertex sourceAndMapVertex = verticesSorted.get(0);
// UNDEFINED exchange mode is translated into PIPELINED_BOUNDED result partition by default
- Assertions.assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
+ assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
.isEqualTo(ResultPartitionType.PIPELINED_BOUNDED);
}
@@ -764,13 +747,13 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
- Assertions.assertThat(verticesSorted.size()).isEqualTo(2);
+ assertThat(verticesSorted).hasSize(2);
// it can be chained with HYBRID_FULL exchange mode
JobVertex sourceAndMapVertex = verticesSorted.get(0);
// HYBRID_FULL exchange mode is translated into HYBRID_FULL result partition
- Assertions.assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
+ assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
.isEqualTo(ResultPartitionType.HYBRID_FULL);
}
@@ -803,35 +786,35 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
- Assertions.assertThat(verticesSorted.size()).isEqualTo(2);
+ assertThat(verticesSorted).hasSize(2);
// it can be chained with HYBRID_SELECTIVE exchange mode
JobVertex sourceAndMapVertex = verticesSorted.get(0);
// HYBRID_SELECTIVE exchange mode is translated into HYBRID_SELECTIVE result partition
- Assertions.assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
+ assertThat(sourceAndMapVertex.getProducedDataSets().get(0).getResultType())
.isEqualTo(ResultPartitionType.HYBRID_SELECTIVE);
}
@Test
- public void testStreamingJobTypeByDefault() {
+ void testStreamingJobTypeByDefault() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements("test").addSink(new DiscardingSink<>());
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
- Assertions.assertThat(jobGraph.getJobType()).isEqualTo(JobType.STREAMING);
+ assertThat(jobGraph.getJobType()).isEqualTo(JobType.STREAMING);
}
@Test
- public void testBatchJobType() {
+ void testBatchJobType() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.fromElements("test").addSink(new DiscardingSink<>());
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
- Assertions.assertThat(jobGraph.getJobType()).isEqualTo(JobType.BATCH);
+ assertThat(jobGraph.getJobType()).isEqualTo(JobType.BATCH);
}
@Test
- public void testPartitionTypesInBatchMode() {
+ void testPartitionTypesInBatchMode() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(4);
@@ -851,35 +834,25 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
- assertThat(
- verticesSorted.get(0) /* source - forward */,
- hasOutputPartitionType(ResultPartitionType.BLOCKING));
- assertThat(
- verticesSorted.get(1) /* rescale */,
- hasOutputPartitionType(ResultPartitionType.BLOCKING));
- assertThat(
- verticesSorted.get(2) /* rebalance */,
- hasOutputPartitionType(ResultPartitionType.BLOCKING));
- assertThat(
- verticesSorted.get(3) /* keyBy */,
- hasOutputPartitionType(ResultPartitionType.BLOCKING));
- assertThat(
- verticesSorted.get(4) /* forward - sink */,
- hasOutputPartitionType(ResultPartitionType.BLOCKING));
+ assertHasOutputPartitionType(
+ verticesSorted.get(0) /* source - forward */, ResultPartitionType.BLOCKING);
+ assertHasOutputPartitionType(
+ verticesSorted.get(1) /* rescale */, ResultPartitionType.BLOCKING);
+ assertHasOutputPartitionType(
+ verticesSorted.get(2) /* rebalance */, ResultPartitionType.BLOCKING);
+ assertHasOutputPartitionType(
+ verticesSorted.get(3) /* keyBy */, ResultPartitionType.BLOCKING);
+ assertHasOutputPartitionType(
+ verticesSorted.get(4) /* forward - sink */, ResultPartitionType.BLOCKING);
}
- private Matcher<JobVertex> hasOutputPartitionType(ResultPartitionType partitionType) {
- return new FeatureMatcher<JobVertex, ResultPartitionType>(
- equalTo(partitionType), "output partition type", "output partition type") {
- @Override
- protected ResultPartitionType featureValueOf(JobVertex actual) {
- return actual.getProducedDataSets().get(0).getResultType();
- }
- };
+ private void assertHasOutputPartitionType(
+ JobVertex jobVertex, ResultPartitionType partitionType) {
+ assertThat(jobVertex.getProducedDataSets().get(0).getResultType()).isEqualTo(partitionType);
}
@Test
- public void testNormalExchangeModeWithBufferTimeout() {
+ void testNormalExchangeModeWithBufferTimeout() {
testCompatibleExchangeModeWithBufferTimeout(StreamExchangeMode.PIPELINED);
}
@@ -901,7 +874,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
}
@Test
- public void testDisablingBufferTimeoutWithPipelinedExchanges() {
+ void testDisablingBufferTimeoutWithPipelinedExchanges() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setBufferTimeout(-1);
@@ -913,14 +886,14 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
final StreamConfig streamConfig = new StreamConfig(vertex.getConfiguration());
for (NonChainedOutput output :
streamConfig.getVertexNonChainedOutputs(this.getClass().getClassLoader())) {
- assertThat(output.getBufferTimeout(), equalTo(-1L));
+ assertThat(output.getBufferTimeout()).isEqualTo(-1L);
}
}
}
/** Test iteration job, check slot sharing group and co-location group. */
@Test
- public void testIteration() {
+ void testIteration() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> source = env.fromElements(1, 2, 3).name("source");
@@ -933,45 +906,37 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
SlotSharingGroup slotSharingGroup = jobGraph.getVerticesAsArray()[0].getSlotSharingGroup();
- Assertions.assertThat(slotSharingGroup).isNotNull();
+ assertThat(slotSharingGroup).isNotNull();
CoLocationGroup iterationSourceCoLocationGroup = null;
CoLocationGroup iterationSinkCoLocationGroup = null;
for (JobVertex jobVertex : jobGraph.getVertices()) {
// all vertices have same slot sharing group by default
- Assertions.assertThat(jobVertex.getSlotSharingGroup()).isEqualTo(slotSharingGroup);
+ assertThat(jobVertex.getSlotSharingGroup()).isEqualTo(slotSharingGroup);
// all iteration vertices have same co-location group,
// others have no co-location group by default
if (jobVertex.getName().startsWith(StreamGraph.ITERATION_SOURCE_NAME_PREFIX)) {
iterationSourceCoLocationGroup = jobVertex.getCoLocationGroup();
- Assertions.assertThat(
- iterationSourceCoLocationGroup
- .getVertexIds()
- .contains(jobVertex.getID()))
- .isTrue();
+ assertThat(iterationSourceCoLocationGroup.getVertexIds())
+ .contains(jobVertex.getID());
} else if (jobVertex.getName().startsWith(StreamGraph.ITERATION_SINK_NAME_PREFIX)) {
iterationSinkCoLocationGroup = jobVertex.getCoLocationGroup();
- Assertions.assertThat(
- iterationSinkCoLocationGroup
- .getVertexIds()
- .contains(jobVertex.getID()))
- .isTrue();
+ assertThat(iterationSinkCoLocationGroup.getVertexIds()).contains(jobVertex.getID());
} else {
- Assertions.assertThat(jobVertex.getCoLocationGroup()).isNull();
+ assertThat(jobVertex.getCoLocationGroup()).isNull();
}
}
- Assertions.assertThat(iterationSourceCoLocationGroup).isNotNull();
- Assertions.assertThat(iterationSinkCoLocationGroup).isNotNull();
- Assertions.assertThat(iterationSinkCoLocationGroup)
- .isEqualTo(iterationSourceCoLocationGroup);
+ assertThat(iterationSourceCoLocationGroup).isNotNull();
+ assertThat(iterationSinkCoLocationGroup).isNotNull();
+ assertThat(iterationSinkCoLocationGroup).isEqualTo(iterationSourceCoLocationGroup);
}
/** Test default job type. */
@Test
- public void testDefaultJobType() {
+ void testDefaultJobType() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamGraph streamGraph =
@@ -979,11 +944,11 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
Collections.emptyList(), env.getConfig(), env.getCheckpointConfig())
.generate();
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
- Assertions.assertThat(jobGraph.getJobType()).isEqualTo(JobType.STREAMING);
+ assertThat(jobGraph.getJobType()).isEqualTo(JobType.STREAMING);
}
@Test
- public void testYieldingOperatorNotChainableToTaskChainedToLegacySource() {
+ void testYieldingOperatorNotChainableToTaskChainedToLegacySource() {
StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1);
chainEnv.fromElements(1)
@@ -998,16 +963,14 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
streamGraph.getStreamNodes().stream()
.sorted(Comparator.comparingInt(StreamNode::getId))
.collect(Collectors.toList());
- Assertions.assertThat(
- areOperatorsChainable(streamNodes.get(0), streamNodes.get(1), streamGraph))
+ assertThat(areOperatorsChainable(streamNodes.get(0), streamNodes.get(1), streamGraph))
.isTrue();
- Assertions.assertThat(
- areOperatorsChainable(streamNodes.get(1), streamNodes.get(2), streamGraph))
+ assertThat(areOperatorsChainable(streamNodes.get(1), streamNodes.get(2), streamGraph))
.isFalse();
}
@Test
- public void testYieldingOperatorChainableToTaskNotChainedToLegacySource() {
+ void testYieldingOperatorChainableToTaskNotChainedToLegacySource() {
StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1);
chainEnv.fromElements(1)
@@ -1022,11 +985,9 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
streamGraph.getStreamNodes().stream()
.sorted(Comparator.comparingInt(StreamNode::getId))
.collect(Collectors.toList());
- Assertions.assertThat(
- areOperatorsChainable(streamNodes.get(0), streamNodes.get(1), streamGraph))
+ assertThat(areOperatorsChainable(streamNodes.get(0), streamNodes.get(1), streamGraph))
.isFalse();
- Assertions.assertThat(
- areOperatorsChainable(streamNodes.get(1), streamNodes.get(2), streamGraph))
+ assertThat(areOperatorsChainable(streamNodes.get(1), streamNodes.get(2), streamGraph))
.isTrue();
}
@@ -1035,7 +996,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
* chained to legacy sources, see FLINK-16219.
*/
@Test
- public void testYieldingOperatorProperlyChainedOnLegacySources() {
+ void testYieldingOperatorProperlyChainedOnLegacySources() {
StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1);
chainEnv.fromElements(1)
@@ -1050,9 +1011,9 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
final JobGraph jobGraph = chainEnv.getStreamGraph().getJobGraph();
final List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
- Assertions.assertThat(vertices.size()).isEqualTo(2);
- Assertions.assertThat(vertices.get(0).getOperatorIDs().size()).isEqualTo(2);
- Assertions.assertThat(vertices.get(1).getOperatorIDs().size()).isEqualTo(5);
+ assertThat(vertices).hasSize(2);
+ assertThat(vertices.get(0).getOperatorIDs()).hasSize(2);
+ assertThat(vertices.get(1).getOperatorIDs()).hasSize(5);
}
/**
@@ -1060,7 +1021,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
* chained to new sources, see FLINK-20444.
*/
@Test
- public void testYieldingOperatorProperlyChainedOnNewSources() {
+ void testYieldingOperatorProperlyChainedOnNewSources() {
StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1);
chainEnv.fromSource(
@@ -1073,12 +1034,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
final JobGraph jobGraph = chainEnv.getStreamGraph().getJobGraph();
final List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
- Assertions.assertThat(vertices.size()).isEqualTo(1);
- Assertions.assertThat(vertices.get(0).getOperatorIDs().size()).isEqualTo(4);
+ assertThat(vertices).hasSize(1);
+ assertThat(vertices.get(0).getOperatorIDs()).hasSize(4);
}
@Test
- public void testDeterministicUnionOrder() {
+ void testDeterministicUnionOrder() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
JobGraph jobGraph = getUnionJobGraph(env);
@@ -1092,14 +1053,14 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
JobGraph jobGraph2 = getUnionJobGraph(env);
JobVertex jobSink2 =
Iterables.getLast(jobGraph2.getVerticesSortedTopologicallyFromSources());
- Assertions.assertThat(jobSink)
+ assertThat(jobSink)
.withFailMessage("Different runs should yield different vertexes")
.isNotEqualTo(jobSink2);
List<String> actualSourceOrder =
jobSink2.getInputs().stream()
.map(edge -> edge.getSource().getProducer().getName())
.collect(Collectors.toList());
- Assertions.assertThat(actualSourceOrder)
+ assertThat(actualSourceOrder)
.withFailMessage("Union inputs reordered")
.isEqualTo(expectedSourceOrder);
}
@@ -1121,7 +1082,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
}
@Test
- public void testNotSupportInputSelectableOperatorIfCheckpointing() {
+ void testNotSupportInputSelectableOperatorIfCheckpointing() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000L);
@@ -1134,13 +1095,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
new TestAnyModeReadingStreamOperator("test operator"))
.print();
- Assertions.assertThatThrownBy(
- () -> StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()))
+ assertThatThrownBy(() -> StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()))
.isInstanceOf(UnsupportedOperationException.class);
}
@Test
- public void testManagedMemoryFractionForUnknownResourceSpec() throws Exception {
+ void testManagedMemoryFractionForUnknownResourceSpec() throws Exception {
final ResourceSpec resource = ResourceSpec.UNKNOWN;
final List<ResourceSpec> resourceSpecs =
Arrays.asList(resource, resource, resource, resource);
@@ -1295,20 +1255,20 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
double expectedStateBackendFrac,
Configuration tmConfig) {
final double delta = 0.000001;
- Assertions.assertThat(
+ assertThat(
streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.STATE_BACKEND,
tmConfig,
ClassLoader.getSystemClassLoader()))
.isCloseTo(expectedStateBackendFrac, Offset.offset(delta));
- Assertions.assertThat(
+ assertThat(
streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.PYTHON,
tmConfig,
ClassLoader.getSystemClassLoader()))
.isCloseTo(expectedPythonFrac, Offset.offset(delta));
- Assertions.assertThat(
+ assertThat(
streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(
ManagedMemoryUseCase.OPERATOR,
tmConfig,
@@ -1354,7 +1314,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
}
@Test
- public void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultEnabled() {
+ void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultEnabled() {
final StreamGraph streamGraph = createStreamGraphForSlotSharingTest(new Configuration());
// specify slot sharing group for map1
streamGraph.getStreamNodes().stream()
@@ -1366,7 +1326,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
final List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
- Assertions.assertThat(verticesSorted.size()).isEqualTo(4);
+ assertThat(verticesSorted).hasSize(4);
final List<JobVertex> verticesMatched = getExpectedVerticesList(verticesSorted);
final JobVertex source1Vertex = verticesMatched.get(0);
@@ -1381,13 +1341,13 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
}
@Test
- public void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled() {
+ void testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled() {
final StreamGraph streamGraph = createStreamGraphForSlotSharingTest(new Configuration());
streamGraph.setAllVerticesInSameSlotSharingGroupByDefault(false);
final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
final List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
- Assertions.assertThat(verticesSorted.size()).isEqualTo(4);
+ assertThat(verticesSorted).hasSize(4);
final List<JobVertex> verticesMatched = getExpectedVerticesList(verticesSorted);
final JobVertex source1Vertex = verticesMatched.get(0);
@@ -1400,7 +1360,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
}
@Test
- public void testSlotSharingResourceConfiguration() {
+ void testSlotSharingResourceConfiguration() {
final String slotSharingGroup1 = "slot-a";
final String slotSharingGroup2 = "slot-b";
final ResourceProfile resourceProfile1 = ResourceProfile.fromResources(1, 10);
@@ -1431,25 +1391,25 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
for (JobVertex jobVertex : jobGraph.getVertices()) {
numVertex += 1;
if (jobVertex.getName().contains(slotSharingGroup1)) {
- Assertions.assertThat(resourceProfile1)
- .isEqualTo(jobVertex.getSlotSharingGroup().getResourceProfile());
+ assertThat(jobVertex.getSlotSharingGroup().getResourceProfile())
+ .isEqualTo(resourceProfile1);
} else if (jobVertex.getName().contains(slotSharingGroup2)) {
- Assertions.assertThat(resourceProfile2)
- .isEqualTo(jobVertex.getSlotSharingGroup().getResourceProfile());
+ assertThat(jobVertex.getSlotSharingGroup().getResourceProfile())
+ .isEqualTo(resourceProfile2);
} else if (jobVertex
.getName()
.contains(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)) {
- Assertions.assertThat(resourceProfile3)
- .isEqualTo(jobVertex.getSlotSharingGroup().getResourceProfile());
+ assertThat(jobVertex.getSlotSharingGroup().getResourceProfile())
+ .isEqualTo(resourceProfile3);
} else {
Assertions.fail("");
}
}
- assertThat(numVertex, is(3));
+ assertThat(numVertex).isEqualTo(3);
}
@Test
- public void testSlotSharingResourceConfigurationWithDefaultSlotSharingGroup() {
+ void testSlotSharingResourceConfigurationWithDefaultSlotSharingGroup() {
final ResourceProfile resourceProfile = ResourceProfile.fromResources(1, 10);
final Map<String, ResourceProfile> slotSharingGroupResource = new HashMap<>();
slotSharingGroupResource.put(
@@ -1465,23 +1425,22 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
int numVertex = 0;
for (JobVertex jobVertex : jobGraph.getVertices()) {
numVertex += 1;
- Assertions.assertThat(resourceProfile)
- .isEqualTo(jobVertex.getSlotSharingGroup().getResourceProfile());
+ assertThat(jobVertex.getSlotSharingGroup().getResourceProfile())
+ .isEqualTo(resourceProfile);
}
- assertThat(numVertex, is(2));
+ assertThat(numVertex).isEqualTo(2);
}
@Test
- public void testNamingOfChainedMultipleInputs() {
+ void testNamingOfChainedMultipleInputs() {
String[] sources = new String[] {"source-1", "source-2", "source-3"};
JobGraph graph = createGraphWithMultipleInputs(true, sources);
JobVertex head = graph.getVerticesSortedTopologicallyFromSources().iterator().next();
- Assertions.assertThat(sources)
- .allMatch(source -> head.getOperatorPrettyName().contains(source));
+ assertThat(sources).allMatch(source -> head.getOperatorPrettyName().contains(source));
}
@Test
- public void testNamingOfNonChainedMultipleInputs() {
+ void testNamingOfNonChainedMultipleInputs() {
String[] sources = new String[] {"source-1", "source-2", "source-3"};
JobGraph graph = createGraphWithMultipleInputs(false, sources);
JobVertex head =
@@ -1490,12 +1449,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
vertex ->
vertex.getInvokableClassName()
.equals(MultipleInputStreamTask.class.getName()));
- Assertions.assertThat(head.getName().contains("source-1"))
- .withFailMessage(head.getName())
- .isFalse();
- Assertions.assertThat(head.getOperatorPrettyName().contains("source-1"))
+ assertThat(head.getName()).withFailMessage(head.getName()).doesNotContain("source-1");
+ assertThat(head.getOperatorPrettyName())
.withFailMessage(head.getOperatorPrettyName())
- .isFalse();
+ .doesNotContain("source-1");
}
public JobGraph createGraphWithMultipleInputs(boolean chain, String... inputNames) {
@@ -1521,12 +1478,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
}
@Test
- public void testTreeDescription() {
+ void testTreeDescription() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
JobGraph job = createJobGraphWithDescription(env, "test source");
JobVertex[] allVertices = job.getVerticesAsArray();
- Assertions.assertThat(allVertices.length).isEqualTo(1);
- Assertions.assertThat(allVertices[0].getOperatorPrettyName())
+ assertThat(allVertices).hasSize(1);
+ assertThat(allVertices[0].getOperatorPrettyName())
.isEqualTo(
"test source\n"
+ ":- x + 1\n"
@@ -1538,12 +1495,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
}
@Test
- public void testTreeDescriptionWithChainedSource() {
+ void testTreeDescriptionWithChainedSource() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
JobGraph job = createJobGraphWithDescription(env, "test source 1", "test source 2");
JobVertex[] allVertices = job.getVerticesAsArray();
- Assertions.assertThat(allVertices.length).isEqualTo(1);
- Assertions.assertThat(allVertices[0].getOperatorPrettyName())
+ assertThat(allVertices).hasSize(1);
+ assertThat(allVertices[0].getOperatorPrettyName())
.isEqualTo(
"operator chained with source [test source 1, test source 2]\n"
+ ":- x + 1\n"
@@ -1555,7 +1512,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
}
@Test
- public void testCascadingDescription() {
+ void testCascadingDescription() {
final Configuration config = new Configuration();
config.set(
PipelineOptions.VERTEX_DESCRIPTION_MODE,
@@ -1564,14 +1521,14 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
StreamExecutionEnvironment.getExecutionEnvironment(config);
JobGraph job = createJobGraphWithDescription(env, "test source");
JobVertex[] allVertices = job.getVerticesAsArray();
- Assertions.assertThat(allVertices.length).isEqualTo(1);
- Assertions.assertThat(allVertices[0].getOperatorPrettyName())
+ assertThat(allVertices).hasSize(1);
+ assertThat(allVertices[0].getOperatorPrettyName())
.isEqualTo(
"test source -> (x + 1 -> (first print of map1 , second print of map1) , x + 2 -> (first print of map2 , second print of map2))");
}
@Test
- public void testCascadingDescriptionWithChainedSource() {
+ void testCascadingDescriptionWithChainedSource() {
final Configuration config = new Configuration();
config.set(
PipelineOptions.VERTEX_DESCRIPTION_MODE,
@@ -1580,38 +1537,38 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
StreamExecutionEnvironment.getExecutionEnvironment(config);
JobGraph job = createJobGraphWithDescription(env, "test source 1", "test source 2");
JobVertex[] allVertices = job.getVerticesAsArray();
- Assertions.assertThat(allVertices.length).isEqualTo(1);
- Assertions.assertThat(allVertices[0].getOperatorPrettyName())
+ assertThat(allVertices).hasSize(1);
+ assertThat(allVertices[0].getOperatorPrettyName())
.isEqualTo(
"operator chained with source [test source 1, test source 2] -> (x + 1 -> (first print of map1 , second print of map1) , x + 2 -> (first print of map2 , second print of map2))");
}
@Test
- public void testNamingWithoutIndex() {
+ void testNamingWithoutIndex() {
JobGraph job = createStreamGraphForSlotSharingTest(new Configuration()).getJobGraph();
List<JobVertex> allVertices = job.getVerticesSortedTopologicallyFromSources();
- Assertions.assertThat(allVertices.size()).isEqualTo(4);
- Assertions.assertThat(allVertices.get(0).getName()).isEqualTo("Source: source1");
- Assertions.assertThat(allVertices.get(1).getName()).isEqualTo("Source: source2");
- Assertions.assertThat(allVertices.get(2).getName()).isEqualTo("map1");
- Assertions.assertThat(allVertices.get(3).getName()).isEqualTo("map2");
+ assertThat(allVertices).hasSize(4);
+ assertThat(allVertices.get(0).getName()).isEqualTo("Source: source1");
+ assertThat(allVertices.get(1).getName()).isEqualTo("Source: source2");
+ assertThat(allVertices.get(2).getName()).isEqualTo("map1");
+ assertThat(allVertices.get(3).getName()).isEqualTo("map2");
}
@Test
- public void testNamingWithIndex() {
+ void testNamingWithIndex() {
Configuration config = new Configuration();
config.setBoolean(PipelineOptions.VERTEX_NAME_INCLUDE_INDEX_PREFIX, true);
JobGraph job = createStreamGraphForSlotSharingTest(config).getJobGraph();
List<JobVertex> allVertices = job.getVerticesSortedTopologicallyFromSources();
- Assertions.assertThat(allVertices.size()).isEqualTo(4);
- Assertions.assertThat(allVertices.get(0).getName()).isEqualTo("[vertex-0]Source: source1");
- Assertions.assertThat(allVertices.get(1).getName()).isEqualTo("[vertex-1]Source: source2");
- Assertions.assertThat(allVertices.get(2).getName()).isEqualTo("[vertex-2]map1");
- Assertions.assertThat(allVertices.get(3).getName()).isEqualTo("[vertex-3]map2");
+ assertThat(allVertices).hasSize(4);
+ assertThat(allVertices.get(0).getName()).isEqualTo("[vertex-0]Source: source1");
+ assertThat(allVertices.get(1).getName()).isEqualTo("[vertex-1]Source: source2");
+ assertThat(allVertices.get(2).getName()).isEqualTo("[vertex-2]map1");
+ assertThat(allVertices.get(3).getName()).isEqualTo("[vertex-3]map2");
}
@Test
- public void testCacheJobGraph() throws Throwable {
+ void testCacheJobGraph() throws Throwable {
final TestingStreamExecutionEnvironment env = new TestingStreamExecutionEnvironment();
env.setParallelism(2);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
@@ -1619,8 +1576,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
DataStream<Integer> source = env.fromElements(1, 2, 3).name("source");
CachedDataStream<Integer> cachedStream =
source.map(i -> i + 1).name("map-1").map(i -> i + 1).name("map-2").cache();
- Assertions.assertThat(cachedStream.getTransformation())
- .isInstanceOf(CacheTransformation.class);
+ assertThat(cachedStream.getTransformation()).isInstanceOf(CacheTransformation.class);
CacheTransformation<Integer> cacheTransformation =
(CacheTransformation<Integer>) cachedStream.getTransformation();
@@ -1628,7 +1584,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
List<JobVertex> allVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
- Assertions.assertThat(allVertices.size()).isEqualTo(3);
+ assertThat(allVertices).hasSize(3);
final JobVertex cacheWriteVertex =
allVertices.stream()
@@ -1641,13 +1597,13 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
"CacheWrite job vertex not found"));
final List<JobEdge> inputs = cacheWriteVertex.getInputs();
- Assertions.assertThat(inputs.size()).isEqualTo(1);
- Assertions.assertThat(inputs.get(0).getDistributionPattern()).isEqualTo(POINTWISE);
- Assertions.assertThat(inputs.get(0).getSource().getResultType())
+ assertThat(inputs).hasSize(1);
+ assertThat(inputs.get(0).getDistributionPattern()).isEqualTo(POINTWISE);
+ assertThat(inputs.get(0).getSource().getResultType())
.isEqualTo(ResultPartitionType.BLOCKING_PERSISTENT);
- Assertions.assertThat(new AbstractID(inputs.get(0).getSourceId()))
+ assertThat(new AbstractID(inputs.get(0).getSourceId()))
.isEqualTo(cacheTransformation.getDatasetId());
- Assertions.assertThat(inputs.get(0).getSource().getProducer().getName())
+ assertThat(inputs.get(0).getSource().getProducer().getName())
.isEqualTo("map-1 -> map-2 -> Sink: print");
env.addCompletedClusterDataset(cacheTransformation.getDatasetId());
@@ -1655,13 +1611,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
jobGraph = env.getStreamGraph().getJobGraph();
allVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
- Assertions.assertThat(allVertices.size()).isEqualTo(1);
- Assertions.assertThat(allVertices.get(0).getName()).isEqualTo("CacheRead -> Sink: print");
- Assertions.assertThat(allVertices.get(0).getIntermediateDataSetIdsToConsume().size())
- .isEqualTo(1);
- Assertions.assertThat(
- new AbstractID(
- allVertices.get(0).getIntermediateDataSetIdsToConsume().get(0)))
+ assertThat(allVertices).hasSize(1);
+ assertThat(allVertices.get(0).getName()).isEqualTo("CacheRead -> Sink: print");
+ assertThat(allVertices.get(0).getIntermediateDataSetIdsToConsume()).hasSize(1);
+ assertThat(new AbstractID(allVertices.get(0).getIntermediateDataSetIdsToConsume().get(0)))
.isEqualTo(cacheTransformation.getDatasetId());
}
@@ -1670,7 +1623,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
* dataset if they have the same parallelism and partitioner.
*/
@Test
- public void testIntermediateDataSetReuse() {
+ void testIntermediateDataSetReuse() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setBufferTimeout(-1);
DataStream<Integer> source = env.fromElements(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
@@ -1700,25 +1653,25 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
- Assertions.assertThat(vertices.size()).isEqualTo(9);
+ assertThat(vertices).hasSize(9);
JobVertex sourceVertex = vertices.get(0);
List<IntermediateDataSetID> producedDataSet =
sourceVertex.getProducedDataSets().stream()
.map(IntermediateDataSet::getId)
.collect(Collectors.toList());
- Assertions.assertThat(producedDataSet.size()).isEqualTo(6);
+ assertThat(producedDataSet).hasSize(6);
JobVertex sinkVertex1 = checkNotNull(findJobVertexWithName(vertices, "sink1"));
JobVertex sinkVertex2 = checkNotNull(findJobVertexWithName(vertices, "sink2"));
JobVertex sinkVertex3 = checkNotNull(findJobVertexWithName(vertices, "sink3"));
JobVertex sinkVertex4 = checkNotNull(findJobVertexWithName(vertices, "sink4"));
- Assertions.assertThat(sinkVertex2.getInputs().get(0).getSource().getId())
+ assertThat(sinkVertex2.getInputs().get(0).getSource().getId())
.isEqualTo(sinkVertex1.getInputs().get(0).getSource().getId());
- Assertions.assertThat(sinkVertex4.getInputs().get(0).getSource().getId())
+ assertThat(sinkVertex4.getInputs().get(0).getSource().getId())
.isEqualTo(sinkVertex3.getInputs().get(0).getSource().getId());
- Assertions.assertThat(sinkVertex3.getInputs().get(0).getSource().getId())
+ assertThat(sinkVertex3.getInputs().get(0).getSource().getId())
.isNotEqualTo(sinkVertex1.getInputs().get(0).getSource().getId());
StreamConfig streamConfig = new StreamConfig(sourceVertex.getConfiguration());
@@ -1726,18 +1679,16 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
streamConfig.getOperatorNonChainedOutputs(getClass().getClassLoader()).stream()
.map(NonChainedOutput::getDataSetId)
.collect(Collectors.toList());
- Assertions.assertThat(nonChainedOutputs.size()).isEqualTo(5);
- Assertions.assertThat(
- nonChainedOutputs.contains(
- sinkVertex3.getInputs().get(0).getSource().getId()))
- .isFalse();
+ assertThat(nonChainedOutputs).hasSize(5);
+ assertThat(nonChainedOutputs)
+ .doesNotContain(sinkVertex3.getInputs().get(0).getSource().getId());
List<IntermediateDataSetID> streamOutputsInOrder =
streamConfig.getVertexNonChainedOutputs(getClass().getClassLoader()).stream()
.map(NonChainedOutput::getDataSetId)
.collect(Collectors.toList());
- Assertions.assertThat(streamOutputsInOrder.size()).isEqualTo(6);
- Assertions.assertThat(streamOutputsInOrder.toArray()).isEqualTo(producedDataSet.toArray());
+ assertThat(streamOutputsInOrder).hasSize(6);
+ assertThat(streamOutputsInOrder).isEqualTo(producedDataSet);
}
private static JobVertex findJobVertexWithName(List<JobVertex> vertices, String name) {
@@ -1841,7 +1792,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
private void assertSameSlotSharingGroup(JobVertex... vertices) {
for (int i = 0; i < vertices.length - 1; i++) {
- Assertions.assertThat(vertices[i + 1].getSlotSharingGroup())
+ assertThat(vertices[i + 1].getSlotSharingGroup())
.isEqualTo(vertices[i].getSlotSharingGroup());
}
}
@@ -1849,7 +1800,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
private void assertDistinctSharingGroups(JobVertex... vertices) {
for (int i = 0; i < vertices.length - 1; i++) {
for (int j = i + 1; j < vertices.length; j++) {
- Assertions.assertThat(vertices[i].getSlotSharingGroup())
+ assertThat(vertices[i].getSlotSharingGroup())
.isNotEqualTo(vertices[j].getSlotSharingGroup());
}
}