You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/07/29 12:22:37 UTC
[flink] 02/02: [FLINK-18656][tests] Rename
MultipleInputStreamTaskTestHarnessBuilder to
StreamTaskMailboxTestHarnessBuilder
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b122f8ff92e78ec17d5692e02da5aa922e0d3985
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Jul 28 16:35:51 2020 +0200
[FLINK-18656][tests] Rename MultipleInputStreamTaskTestHarnessBuilder to StreamTaskMailboxTestHarnessBuilder
The orignal concept MultipleInputStreamTaskTestHarnessBuilder proved much more
versatile then initially expected and it can easily handle all of the uses cases:
- MultipleInputStreamTask
- OneInputStreamTask
- SourceStreamTask
Hence there is no need for the abstraction and no need to provide specialized versions of
MultipleInputStreamTaskTestHarnessBuilder for the other types of tasks.
---
.../runtime/tasks/MultipleInputStreamTaskTest.java | 22 ++---
.../MultipleInputStreamTaskTestHarnessBuilder.java | 108 ---------------------
.../tasks/SourceOperatorStreamTaskTest.java | 4 +-
.../runtime/tasks/SourceStreamTaskTest.java | 4 +-
.../tasks/StreamTaskMailboxTestHarnessBuilder.java | 61 +++++++++++-
...treamTaskMultipleInputSelectiveReadingTest.java | 4 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 12 +--
7 files changed, 82 insertions(+), 133 deletions(-)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index ed3bd77..51bb156 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -81,7 +81,7 @@ public class MultipleInputStreamTaskTest {
@Test
public void testOpenCloseAndTimestamps() throws Exception {
try (StreamTaskMailboxTestHarness<String> testHarness =
- new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+ new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.INT_TYPE_INFO)
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
@@ -111,7 +111,7 @@ public class MultipleInputStreamTaskTest {
@Test
public void testCheckpointBarriers() throws Exception {
try (StreamTaskMailboxTestHarness<String> testHarness =
- new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+ new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
@@ -158,7 +158,7 @@ public class MultipleInputStreamTaskTest {
@Test
public void testOvertakingCheckpointBarriers() throws Exception {
try (StreamTaskMailboxTestHarness<String> testHarness =
- new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+ new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
@@ -219,7 +219,7 @@ public class MultipleInputStreamTaskTest {
};
try (StreamTaskMailboxTestHarness<String> testHarness =
- new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+ new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
@@ -283,7 +283,7 @@ public class MultipleInputStreamTaskTest {
@Test
public void testClosingAllOperatorsOnChainProperly() throws Exception {
StreamTaskMailboxTestHarness<String> testHarness =
- new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+ new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
@@ -330,7 +330,7 @@ public class MultipleInputStreamTaskTest {
@Test
public void testInputFairness() throws Exception {
try (StreamTaskMailboxTestHarness<String> testHarness =
- new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+ new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
@@ -369,7 +369,7 @@ public class MultipleInputStreamTaskTest {
@Test
public void testWatermark() throws Exception {
try (StreamTaskMailboxTestHarness<String> testHarness =
- new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+ new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
@@ -441,7 +441,7 @@ public class MultipleInputStreamTaskTest {
@Test
public void testWatermarkAndStreamStatusForwarding() throws Exception {
try (StreamTaskMailboxTestHarness<String> testHarness =
- new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+ new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
@@ -502,7 +502,7 @@ public class MultipleInputStreamTaskTest {
};
try (StreamTaskMailboxTestHarness<String> testHarness =
- new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+ new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.INT_TYPE_INFO)
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
@@ -588,7 +588,7 @@ public class MultipleInputStreamTaskTest {
final TaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);
try (StreamTaskMailboxTestHarness<String> testHarness =
- new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+ new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
@@ -610,7 +610,7 @@ public class MultipleInputStreamTaskTest {
final TaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);
try (StreamTaskMailboxTestHarness<String> testHarness =
- new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+ new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.INT_TYPE_INFO)
.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTestHarnessBuilder.java
deleted file mode 100644
index cd0627e..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTestHarnessBuilder.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.graph.StreamNode;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.util.function.FunctionWithException;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Builder to create a {@link StreamTaskMailboxTestHarness} to test {@link MultipleInputStreamTask}.
- */
-public class MultipleInputStreamTaskTestHarnessBuilder<OUT> extends StreamTaskMailboxTestHarnessBuilder<OUT> {
-
- private final ArrayList<TypeSerializer<?>> inputSerializers = new ArrayList<>();
- private final ArrayList<Integer> inputChannelsPerGate = new ArrayList<>();
-
- public MultipleInputStreamTaskTestHarnessBuilder(
- FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory,
- TypeInformation<OUT> outputType) {
- super(taskFactory, outputType);
- }
-
- public MultipleInputStreamTaskTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType) {
- return addInput(inputType, 1);
- }
-
- public MultipleInputStreamTaskTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType, int inputChannels) {
- return addInput(inputType, inputChannels, null);
- }
-
- public MultipleInputStreamTaskTestHarnessBuilder<OUT> addInput(
- TypeInformation<?> inputType,
- int inputChannels,
- @Nullable KeySelector<?, ?> keySelector) {
- streamConfig.setStatePartitioner(inputSerializers.size(), keySelector);
- inputSerializers.add(inputType.createSerializer(executionConfig));
- inputChannelsPerGate.add(inputChannels);
- return this;
- }
-
- @Override
- protected void initializeInputs(StreamMockEnvironment streamMockEnvironment) {
- inputGates = new StreamTestSingleInputGate[inputSerializers.size()];
- List<StreamEdge> inPhysicalEdges = new LinkedList<>();
-
- StreamOperator<?> dummyOperator = new AbstractStreamOperator<Object>() {
- private static final long serialVersionUID = 1L;
- };
-
- StreamNode sourceVertexDummy = new StreamNode(0, "default group", null, dummyOperator, "source dummy", new LinkedList<>(), SourceStreamTask.class);
- StreamNode targetVertexDummy = new StreamNode(1, "default group", null, dummyOperator, "target dummy", new LinkedList<>(), SourceStreamTask.class);
-
- for (int i = 0; i < inputSerializers.size(); i++) {
- TypeSerializer<?> inputSerializer = inputSerializers.get(i);
- inputGates[i] = new StreamTestSingleInputGate<>(
- inputChannelsPerGate.get(i),
- i,
- inputSerializer,
- bufferSize);
-
- StreamEdge streamEdge = new StreamEdge(
- sourceVertexDummy,
- targetVertexDummy,
- i + 1,
- new LinkedList<>(),
- new BroadcastPartitioner<>(),
- null);
-
- inPhysicalEdges.add(streamEdge);
- streamMockEnvironment.addInputGate(inputGates[i].getInputGate());
- }
-
- streamConfig.setInPhysicalEdges(inPhysicalEdges);
- streamConfig.setNumberOfInputs(inputGates.length);
- streamConfig.setTypeSerializersIn(inputSerializers.toArray(new TypeSerializer[inputSerializers.size()]));
- }
-}
-
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index df60344..56475ed 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -136,8 +136,8 @@ public class SourceOperatorStreamTaskTest {
WatermarkStrategy.noWatermarks());
// build a test harness.
- MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
- new MultipleInputStreamTaskTestHarnessBuilder<>(SourceOperatorStreamTask::new, BasicTypeInfo.INT_TYPE_INFO);
+ StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+ new StreamTaskMailboxTestHarnessBuilder<>(SourceOperatorStreamTask::new, BasicTypeInfo.INT_TYPE_INFO);
if (snapshot != null) {
// Set initial snapshot if needed.
builder.setTaskStateSnapshot(checkpointId, snapshot);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index e63ad6c..27c2576 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -113,8 +113,8 @@ public class SourceStreamTaskTest {
@Test(timeout = 60_000)
public void testStartDelayMetric() throws Exception {
long sleepTime = 42;
- MultipleInputStreamTaskTestHarnessBuilder<String> builder =
- new MultipleInputStreamTaskTestHarnessBuilder<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+ StreamTaskMailboxTestHarnessBuilder<String> builder =
+ new StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
final Map<String, Metric> metrics = new ConcurrentHashMap<>();
final TaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
index e9fb768..6946156 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
@@ -48,6 +49,7 @@ import org.apache.flink.util.function.FunctionWithException;
import javax.annotation.Nullable;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -60,7 +62,7 @@ import static org.apache.flink.util.Preconditions.checkState;
/**
* Builder class for {@link StreamTaskMailboxTestHarness}.
*/
-public abstract class StreamTaskMailboxTestHarnessBuilder<OUT> {
+public class StreamTaskMailboxTestHarnessBuilder<OUT> {
protected final FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory;
protected final TypeSerializer<OUT> outputSerializer;
protected final ExecutionConfig executionConfig = new ExecutionConfig();
@@ -77,6 +79,9 @@ public abstract class StreamTaskMailboxTestHarnessBuilder<OUT> {
protected TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
protected Map<Long, TaskStateSnapshot> taskStateSnapshots;
+ protected final ArrayList<TypeSerializer<?>> inputSerializers = new ArrayList<>();
+ protected final ArrayList<Integer> inputChannelsPerGate = new ArrayList<>();
+
private boolean setupCalled = false;
public StreamTaskMailboxTestHarnessBuilder(
@@ -86,6 +91,24 @@ public abstract class StreamTaskMailboxTestHarnessBuilder<OUT> {
outputSerializer = outputType.createSerializer(executionConfig);
}
+ public StreamTaskMailboxTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType) {
+ return addInput(inputType, 1);
+ }
+
+ public StreamTaskMailboxTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType, int inputChannels) {
+ return addInput(inputType, inputChannels, null);
+ }
+
+ public StreamTaskMailboxTestHarnessBuilder<OUT> addInput(
+ TypeInformation<?> inputType,
+ int inputChannels,
+ @Nullable KeySelector<?, ?> keySelector) {
+ streamConfig.setStatePartitioner(inputSerializers.size(), keySelector);
+ inputSerializers.add(inputType.createSerializer(executionConfig));
+ inputChannelsPerGate.add(inputChannels);
+ return this;
+ }
+
public StreamTaskMailboxTestHarness<OUT> build() throws Exception {
streamConfig.setBufferTimeout(bufferTimeout);
@@ -125,7 +148,41 @@ public abstract class StreamTaskMailboxTestHarnessBuilder<OUT> {
streamMockEnvironment);
}
- protected abstract void initializeInputs(StreamMockEnvironment streamMockEnvironment);
+ protected void initializeInputs(StreamMockEnvironment streamMockEnvironment) {
+ inputGates = new StreamTestSingleInputGate[inputSerializers.size()];
+ List<StreamEdge> inPhysicalEdges = new LinkedList<>();
+
+ StreamOperator<?> dummyOperator = new AbstractStreamOperator<Object>() {
+ private static final long serialVersionUID = 1L;
+ };
+
+ StreamNode sourceVertexDummy = new StreamNode(0, "default group", null, dummyOperator, "source dummy", new LinkedList<>(), SourceStreamTask.class);
+ StreamNode targetVertexDummy = new StreamNode(1, "default group", null, dummyOperator, "target dummy", new LinkedList<>(), SourceStreamTask.class);
+
+ for (int i = 0; i < inputSerializers.size(); i++) {
+ TypeSerializer<?> inputSerializer = inputSerializers.get(i);
+ inputGates[i] = new StreamTestSingleInputGate<>(
+ inputChannelsPerGate.get(i),
+ i,
+ inputSerializer,
+ bufferSize);
+
+ StreamEdge streamEdge = new StreamEdge(
+ sourceVertexDummy,
+ targetVertexDummy,
+ i + 1,
+ new LinkedList<>(),
+ new BroadcastPartitioner<>(),
+ null);
+
+ inPhysicalEdges.add(streamEdge);
+ streamMockEnvironment.addInputGate(inputGates[i].getInputGate());
+ }
+
+ streamConfig.setInPhysicalEdges(inPhysicalEdges);
+ streamConfig.setNumberOfInputs(inputGates.length);
+ streamConfig.setTypeSerializersIn(inputSerializers.toArray(new TypeSerializer[inputSerializers.size()]));
+ }
public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperator<?> operator) {
return setupOutputForSingletonOperatorChain(SimpleOperatorFactory.of(operator), new OperatorID());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java
index ef01cf3..b51a701 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java
@@ -140,7 +140,7 @@ public class StreamTaskMultipleInputSelectiveReadingTest {
ArrayDeque<Object> expectedOutput,
boolean orderedCheck) throws Exception {
try (StreamTaskMailboxTestHarness<String> testHarness =
- new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+ new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.INT_TYPE_INFO)
.setupOutputForSingletonOperatorChain(streamOperatorFactory)
@@ -176,7 +176,7 @@ public class StreamTaskMultipleInputSelectiveReadingTest {
@Test
public void testInputStarvation() throws Exception {
try (StreamTaskMailboxTestHarness<String> testHarness =
- new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+ new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 833f07b..09af3af 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -949,8 +949,8 @@ public class StreamTaskTest extends TestLogger {
@Test
public void testNotifyCheckpointOnClosedOperator() throws Throwable {
ClosingOperator operator = new ClosingOperator();
- MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
- new MultipleInputStreamTaskTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+ StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+ new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
.addInput(BasicTypeInfo.INT_TYPE_INFO);
StreamTaskMailboxTestHarness<Integer> harness = builder
.setupOutputForSingletonOperatorChain(operator)
@@ -984,8 +984,8 @@ public class StreamTaskTest extends TestLogger {
private void testFailToConfirmCheckpointMessage(Consumer<StreamTask<?, ?>> consumer) throws Exception {
StreamMap<Integer, Integer> streamMap = new StreamMap<>(new FailOnNotifyCheckpointMapper<>());
- MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
- new MultipleInputStreamTaskTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+ StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+ new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
.addInput(BasicTypeInfo.INT_TYPE_INFO);
StreamTaskMailboxTestHarness<Integer> harness = builder
.setupOutputForSingletonOperatorChain(streamMap)
@@ -1008,8 +1008,8 @@ public class StreamTaskTest extends TestLogger {
@Test
public void testCheckpointDeclinedOnClosedOperator() throws Throwable {
ClosingOperator operator = new ClosingOperator();
- MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
- new MultipleInputStreamTaskTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+ StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+ new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
.addInput(BasicTypeInfo.INT_TYPE_INFO);
StreamTaskMailboxTestHarness<Integer> harness = builder
.setupOutputForSingletonOperatorChain(operator)