You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/07/29 12:22:37 UTC

[flink] 02/02: [FLINK-18656][tests] Rename MultipleInputStreamTaskTestHarnessBuilder to StreamTaskMailboxTestHarnessBuilder

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b122f8ff92e78ec17d5692e02da5aa922e0d3985
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Jul 28 16:35:51 2020 +0200

    [FLINK-18656][tests] Rename MultipleInputStreamTaskTestHarnessBuilder to StreamTaskMailboxTestHarnessBuilder
    
    The orignal concept MultipleInputStreamTaskTestHarnessBuilder proved much more
    versatile then initially expected and it can easily handle all of the uses cases:
    - MultipleInputStreamTask
    - OneInputStreamTask
    - SourceStreamTask
    
    Hence there is no need for the abstraction and no need to provide specialized versions of
    MultipleInputStreamTaskTestHarnessBuilder for the other types of tasks.
---
 .../runtime/tasks/MultipleInputStreamTaskTest.java |  22 ++---
 .../MultipleInputStreamTaskTestHarnessBuilder.java | 108 ---------------------
 .../tasks/SourceOperatorStreamTaskTest.java        |   4 +-
 .../runtime/tasks/SourceStreamTaskTest.java        |   4 +-
 .../tasks/StreamTaskMailboxTestHarnessBuilder.java |  61 +++++++++++-
 ...treamTaskMultipleInputSelectiveReadingTest.java |   4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |  12 +--
 7 files changed, 82 insertions(+), 133 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index ed3bd77..51bb156 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -81,7 +81,7 @@ public class MultipleInputStreamTaskTest {
 	@Test
 	public void testOpenCloseAndTimestamps() throws Exception {
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+			new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.INT_TYPE_INFO)
 				.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
@@ -111,7 +111,7 @@ public class MultipleInputStreamTaskTest {
 	@Test
 	public void testCheckpointBarriers() throws Exception {
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+			new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
 				.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
 				.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
@@ -158,7 +158,7 @@ public class MultipleInputStreamTaskTest {
 	@Test
 	public void testOvertakingCheckpointBarriers() throws Exception {
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+			new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
 				.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
 				.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
@@ -219,7 +219,7 @@ public class MultipleInputStreamTaskTest {
 		};
 
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+			new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
@@ -283,7 +283,7 @@ public class MultipleInputStreamTaskTest {
 	@Test
 	public void testClosingAllOperatorsOnChainProperly() throws Exception {
 		StreamTaskMailboxTestHarness<String> testHarness =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+			new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
@@ -330,7 +330,7 @@ public class MultipleInputStreamTaskTest {
 	@Test
 	public void testInputFairness() throws Exception {
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-				new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+				new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
@@ -369,7 +369,7 @@ public class MultipleInputStreamTaskTest {
 	@Test
 	public void testWatermark() throws Exception {
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-				new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+				new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
 					.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
 					.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
@@ -441,7 +441,7 @@ public class MultipleInputStreamTaskTest {
 	@Test
 	public void testWatermarkAndStreamStatusForwarding() throws Exception {
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-				new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+				new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
 					.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
 					.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
@@ -502,7 +502,7 @@ public class MultipleInputStreamTaskTest {
 		};
 
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-				new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+				new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.INT_TYPE_INFO)
 					.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
@@ -588,7 +588,7 @@ public class MultipleInputStreamTaskTest {
 		final TaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);
 
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-				new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+				new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
 					.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
 					.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
@@ -610,7 +610,7 @@ public class MultipleInputStreamTaskTest {
 		final TaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);
 
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-				new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+				new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.INT_TYPE_INFO)
 					.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTestHarnessBuilder.java
deleted file mode 100644
index cd0627e..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTestHarnessBuilder.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.graph.StreamNode;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.util.function.FunctionWithException;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Builder to create a {@link StreamTaskMailboxTestHarness} to test {@link MultipleInputStreamTask}.
- */
-public class MultipleInputStreamTaskTestHarnessBuilder<OUT> extends StreamTaskMailboxTestHarnessBuilder<OUT> {
-
-	private final ArrayList<TypeSerializer<?>> inputSerializers = new ArrayList<>();
-	private final ArrayList<Integer> inputChannelsPerGate = new ArrayList<>();
-
-	public MultipleInputStreamTaskTestHarnessBuilder(
-			FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory,
-			TypeInformation<OUT> outputType) {
-		super(taskFactory, outputType);
-	}
-
-	public MultipleInputStreamTaskTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType) {
-		return addInput(inputType, 1);
-	}
-
-	public MultipleInputStreamTaskTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType, int inputChannels) {
-		return addInput(inputType, inputChannels, null);
-	}
-
-	public MultipleInputStreamTaskTestHarnessBuilder<OUT> addInput(
-			TypeInformation<?> inputType,
-			int inputChannels,
-			@Nullable KeySelector<?, ?> keySelector) {
-		streamConfig.setStatePartitioner(inputSerializers.size(), keySelector);
-		inputSerializers.add(inputType.createSerializer(executionConfig));
-		inputChannelsPerGate.add(inputChannels);
-		return this;
-	}
-
-	@Override
-	protected void initializeInputs(StreamMockEnvironment streamMockEnvironment) {
-		inputGates = new StreamTestSingleInputGate[inputSerializers.size()];
-		List<StreamEdge> inPhysicalEdges = new LinkedList<>();
-
-		StreamOperator<?> dummyOperator = new AbstractStreamOperator<Object>() {
-			private static final long serialVersionUID = 1L;
-		};
-
-		StreamNode sourceVertexDummy = new StreamNode(0, "default group", null, dummyOperator, "source dummy", new LinkedList<>(), SourceStreamTask.class);
-		StreamNode targetVertexDummy = new StreamNode(1, "default group", null, dummyOperator, "target dummy", new LinkedList<>(), SourceStreamTask.class);
-
-		for (int i = 0; i < inputSerializers.size(); i++) {
-			TypeSerializer<?> inputSerializer = inputSerializers.get(i);
-			inputGates[i] = new StreamTestSingleInputGate<>(
-				inputChannelsPerGate.get(i),
-				i,
-				inputSerializer,
-				bufferSize);
-
-			StreamEdge streamEdge = new StreamEdge(
-				sourceVertexDummy,
-				targetVertexDummy,
-				i + 1,
-				new LinkedList<>(),
-				new BroadcastPartitioner<>(),
-				null);
-
-			inPhysicalEdges.add(streamEdge);
-			streamMockEnvironment.addInputGate(inputGates[i].getInputGate());
-		}
-
-		streamConfig.setInPhysicalEdges(inPhysicalEdges);
-		streamConfig.setNumberOfInputs(inputGates.length);
-		streamConfig.setTypeSerializersIn(inputSerializers.toArray(new TypeSerializer[inputSerializers.size()]));
-	}
-}
-
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index df60344..56475ed 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -136,8 +136,8 @@ public class SourceOperatorStreamTaskTest {
 				WatermarkStrategy.noWatermarks());
 
 		// build a test harness.
-		MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
-				new MultipleInputStreamTaskTestHarnessBuilder<>(SourceOperatorStreamTask::new, BasicTypeInfo.INT_TYPE_INFO);
+		StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+				new StreamTaskMailboxTestHarnessBuilder<>(SourceOperatorStreamTask::new, BasicTypeInfo.INT_TYPE_INFO);
 		if (snapshot != null) {
 			// Set initial snapshot if needed.
 			builder.setTaskStateSnapshot(checkpointId, snapshot);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index e63ad6c..27c2576 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -113,8 +113,8 @@ public class SourceStreamTaskTest {
 	@Test(timeout = 60_000)
 	public void testStartDelayMetric() throws Exception {
 		long sleepTime = 42;
-		MultipleInputStreamTaskTestHarnessBuilder<String> builder =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+		StreamTaskMailboxTestHarnessBuilder<String> builder =
+			new StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
 
 		final Map<String, Metric> metrics = new ConcurrentHashMap<>();
 		final TaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
index e9fb768..6946156 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
@@ -48,6 +49,7 @@ import org.apache.flink.util.function.FunctionWithException;
 import javax.annotation.Nullable;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -60,7 +62,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * Builder class for {@link StreamTaskMailboxTestHarness}.
  */
-public abstract class StreamTaskMailboxTestHarnessBuilder<OUT> {
+public class StreamTaskMailboxTestHarnessBuilder<OUT> {
 	protected final FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory;
 	protected final TypeSerializer<OUT> outputSerializer;
 	protected final ExecutionConfig executionConfig = new ExecutionConfig();
@@ -77,6 +79,9 @@ public abstract class StreamTaskMailboxTestHarnessBuilder<OUT> {
 	protected TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
 	protected Map<Long, TaskStateSnapshot> taskStateSnapshots;
 
+	protected final ArrayList<TypeSerializer<?>> inputSerializers = new ArrayList<>();
+	protected final ArrayList<Integer> inputChannelsPerGate = new ArrayList<>();
+
 	private boolean setupCalled = false;
 
 	public StreamTaskMailboxTestHarnessBuilder(
@@ -86,6 +91,24 @@ public abstract class StreamTaskMailboxTestHarnessBuilder<OUT> {
 		outputSerializer = outputType.createSerializer(executionConfig);
 	}
 
+	public StreamTaskMailboxTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType) {
+		return addInput(inputType, 1);
+	}
+
+	public StreamTaskMailboxTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType, int inputChannels) {
+		return addInput(inputType, inputChannels, null);
+	}
+
+	public StreamTaskMailboxTestHarnessBuilder<OUT> addInput(
+			TypeInformation<?> inputType,
+			int inputChannels,
+			@Nullable KeySelector<?, ?> keySelector) {
+		streamConfig.setStatePartitioner(inputSerializers.size(), keySelector);
+		inputSerializers.add(inputType.createSerializer(executionConfig));
+		inputChannelsPerGate.add(inputChannels);
+		return this;
+	}
+
 	public StreamTaskMailboxTestHarness<OUT> build() throws Exception {
 		streamConfig.setBufferTimeout(bufferTimeout);
 
@@ -125,7 +148,41 @@ public abstract class StreamTaskMailboxTestHarnessBuilder<OUT> {
 			streamMockEnvironment);
 	}
 
-	protected abstract void initializeInputs(StreamMockEnvironment streamMockEnvironment);
+	protected void initializeInputs(StreamMockEnvironment streamMockEnvironment) {
+		inputGates = new StreamTestSingleInputGate[inputSerializers.size()];
+		List<StreamEdge> inPhysicalEdges = new LinkedList<>();
+
+		StreamOperator<?> dummyOperator = new AbstractStreamOperator<Object>() {
+			private static final long serialVersionUID = 1L;
+		};
+
+		StreamNode sourceVertexDummy = new StreamNode(0, "default group", null, dummyOperator, "source dummy", new LinkedList<>(), SourceStreamTask.class);
+		StreamNode targetVertexDummy = new StreamNode(1, "default group", null, dummyOperator, "target dummy", new LinkedList<>(), SourceStreamTask.class);
+
+		for (int i = 0; i < inputSerializers.size(); i++) {
+			TypeSerializer<?> inputSerializer = inputSerializers.get(i);
+			inputGates[i] = new StreamTestSingleInputGate<>(
+				inputChannelsPerGate.get(i),
+				i,
+				inputSerializer,
+				bufferSize);
+
+			StreamEdge streamEdge = new StreamEdge(
+				sourceVertexDummy,
+				targetVertexDummy,
+				i + 1,
+				new LinkedList<>(),
+				new BroadcastPartitioner<>(),
+				null);
+
+			inPhysicalEdges.add(streamEdge);
+			streamMockEnvironment.addInputGate(inputGates[i].getInputGate());
+		}
+
+		streamConfig.setInPhysicalEdges(inPhysicalEdges);
+		streamConfig.setNumberOfInputs(inputGates.length);
+		streamConfig.setTypeSerializersIn(inputSerializers.toArray(new TypeSerializer[inputSerializers.size()]));
+	}
 
 	public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperator<?> operator) {
 		return setupOutputForSingletonOperatorChain(SimpleOperatorFactory.of(operator), new OperatorID());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java
index ef01cf3..b51a701 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java
@@ -140,7 +140,7 @@ public class StreamTaskMultipleInputSelectiveReadingTest {
 			ArrayDeque<Object> expectedOutput,
 			boolean orderedCheck) throws Exception {
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+			new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.INT_TYPE_INFO)
 				.setupOutputForSingletonOperatorChain(streamOperatorFactory)
@@ -176,7 +176,7 @@ public class StreamTaskMultipleInputSelectiveReadingTest {
 	@Test
 	public void testInputStarvation() throws Exception {
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-				new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+				new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 833f07b..09af3af 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -949,8 +949,8 @@ public class StreamTaskTest extends TestLogger {
 	@Test
 	public void testNotifyCheckpointOnClosedOperator() throws Throwable {
 		ClosingOperator operator = new ClosingOperator();
-		MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+		StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+			new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
 				.addInput(BasicTypeInfo.INT_TYPE_INFO);
 		StreamTaskMailboxTestHarness<Integer> harness = builder
 			.setupOutputForSingletonOperatorChain(operator)
@@ -984,8 +984,8 @@ public class StreamTaskTest extends TestLogger {
 
 	private void testFailToConfirmCheckpointMessage(Consumer<StreamTask<?, ?>> consumer) throws Exception {
 		StreamMap<Integer, Integer> streamMap = new StreamMap<>(new FailOnNotifyCheckpointMapper<>());
-		MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+		StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+			new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
 				.addInput(BasicTypeInfo.INT_TYPE_INFO);
 		StreamTaskMailboxTestHarness<Integer> harness = builder
 			.setupOutputForSingletonOperatorChain(streamMap)
@@ -1008,8 +1008,8 @@ public class StreamTaskTest extends TestLogger {
 	@Test
 	public void testCheckpointDeclinedOnClosedOperator() throws Throwable {
 		ClosingOperator operator = new ClosingOperator();
-		MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+		StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+			new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
 					.addInput(BasicTypeInfo.INT_TYPE_INFO);
 		StreamTaskMailboxTestHarness<Integer> harness = builder
 			.setupOutputForSingletonOperatorChain(operator)