You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/22 10:26:28 UTC

[flink] 03/10: [hotfix][tests][network] Introduce InputChannelBuilder for creation of InputChannels in tests

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e6b8423ee34c2c49f0ea6eb0955ced5db57dd18f
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Sat May 11 19:49:30 2019 +0200

    [hotfix][tests][network] Introduce InputChannelBuilder for creation of InputChannels in tests
---
 .../partition/consumer/RemoteInputChannel.java     |  11 --
 .../runtime/io/network/NetworkEnvironmentTest.java |  20 +--
 .../netty/PartitionRequestClientHandlerTest.java   |  22 +---
 .../network/partition/InputChannelTestUtils.java   |  47 ++-----
 .../network/partition/InputGateConcurrentTest.java |  12 +-
 .../network/partition/InputGateFairnessTest.java   |  10 +-
 .../partition/consumer/InputChannelBuilder.java    | 145 +++++++++++++++++++++
 .../partition/consumer/LocalInputChannelTest.java  |  19 +--
 .../partition/consumer/RemoteInputChannelTest.java |  68 +++-------
 .../partition/consumer/SingleInputGateTest.java    | 107 ++++-----------
 10 files changed, 228 insertions(+), 233 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 87e3f62..30246c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -108,17 +108,6 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 		ResultPartitionID partitionId,
 		ConnectionID connectionId,
 		ConnectionManager connectionManager,
-		InputChannelMetrics metrics) {
-
-		this(inputGate, channelIndex, partitionId, connectionId, connectionManager, 0, 0, metrics);
-	}
-
-	public RemoteInputChannel(
-		SingleInputGate inputGate,
-		int channelIndex,
-		ResultPartitionID partitionId,
-		ConnectionID connectionId,
-		ConnectionManager connectionManager,
 		int initialBackOff,
 		int maxBackoff,
 		InputChannelMetrics metrics) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index a610838..81cd8f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -19,11 +19,10 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -288,7 +287,7 @@ public class NetworkEnvironmentTest {
 	 * @param numberOfChannels
 	 * 		the number of input channels
 	 *
-	 * @return input gate with some fake settings
+	 * @return input gate with some fake settiFngs
 	 */
 	private SingleInputGate createSingleInputGate(ResultPartitionType partitionType, int numberOfChannels) {
 		return spy(new SingleInputGateBuilder()
@@ -303,15 +302,10 @@ public class NetworkEnvironmentTest {
 			int channelIndex,
 			ResultPartition resultPartition,
 			ConnectionManager connManager) {
-		RemoteInputChannel channel = new RemoteInputChannel(
-			inputGate,
-			channelIndex,
-			resultPartition.getPartitionId(),
-			mock(ConnectionID.class),
-			connManager,
-			0,
-			0,
-			InputChannelTestUtils.newUnregisteredInputChannelMetrics());
-		inputGate.setInputChannel(resultPartition.getPartitionId().getPartitionId(), channel);
+		InputChannelBuilder.newBuilder()
+			.setChannelIndex(channelIndex)
+			.setPartitionId(resultPartition.getPartitionId())
+			.setConnectionManager(connManager)
+			.buildRemoteAndSetToGate(inputGate);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index 487ab1a..16a2415 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -27,9 +27,9 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
-import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -211,7 +211,7 @@ public class PartitionRequestClientHandlerTest {
 	 * @return The new created remote input channel.
 	 */
 	static RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate) throws Exception {
-		return createRemoteInputChannel(inputGate, mock(PartitionRequestClient.class));
+		return createRemoteInputChannel(inputGate, null);
 	}
 
 	/**
@@ -243,19 +243,11 @@ public class PartitionRequestClientHandlerTest {
 		when(connectionManager.createPartitionRequestClient(any(ConnectionID.class)))
 			.thenReturn(client);
 
-		ResultPartitionID partitionId = new ResultPartitionID();
-		RemoteInputChannel inputChannel = new RemoteInputChannel(
-			inputGate,
-			0,
-			partitionId,
-			mock(ConnectionID.class),
-			connectionManager,
-			initialBackoff,
-			maxBackoff,
-			InputChannelTestUtils.newUnregisteredInputChannelMetrics());
-
-		inputGate.setInputChannel(partitionId.getPartitionId(), inputChannel);
-		return inputChannel;
+		return InputChannelBuilder.newBuilder()
+			.setConnectionManager(connectionManager)
+			.setInitialBackoff(initialBackoff)
+			.setMaxBackoff(maxBackoff)
+			.buildRemoteAndSetToGate(inputGate);
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index 82b4c5b..806c556 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -32,8 +32,6 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.net.InetSocketAddress;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.mock;
@@ -43,9 +41,6 @@ import static org.mockito.Mockito.when;
  * Some utility methods used for testing InputChannels and InputGates.
  */
 public class InputChannelTestUtils {
-
-	private static final ConnectionID STUB_CONNECTION_ID = new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
-
 	/**
 	 * Creates a result partition manager that ignores all IDs, and simply returns the given
 	 * subpartitions in sequence.
@@ -96,15 +91,10 @@ public class InputChannelTestUtils {
 		int channelIndex,
 		ResultPartitionManager partitionManager) {
 
-		return new LocalInputChannel(
-			inputGate,
-			channelIndex,
-			new ResultPartitionID(),
-			partitionManager,
-			new TaskEventDispatcher(),
-			0,
-			0,
-			newUnregisteredInputChannelMetrics());
+		return InputChannelBuilder.newBuilder()
+			.setChannelIndex(channelIndex)
+			.setPartitionManager(partitionManager)
+			.buildLocalAndSetToGate(inputGate);
 	}
 
 	public static LocalInputChannel createLocalInputChannel(
@@ -113,15 +103,11 @@ public class InputChannelTestUtils {
 		int initialBackoff,
 		int maxBackoff) {
 
-		return new LocalInputChannel(
-			inputGate,
-			0,
-			new ResultPartitionID(),
-			partitionManager,
-			new TaskEventDispatcher(),
-			initialBackoff,
-			maxBackoff,
-			newUnregisteredInputChannelMetrics());
+		return InputChannelBuilder.newBuilder()
+			.setPartitionManager(partitionManager)
+			.setInitialBackoff(initialBackoff)
+			.setMaxBackoff(maxBackoff)
+			.buildLocalAndSetToGate(inputGate);
 	}
 
 	public static RemoteInputChannel createRemoteInputChannel(
@@ -129,15 +115,10 @@ public class InputChannelTestUtils {
 		int channelIndex,
 		ConnectionManager connectionManager) {
 
-		return new RemoteInputChannel(
-			inputGate,
-			channelIndex,
-			new ResultPartitionID(),
-			STUB_CONNECTION_ID,
-			connectionManager,
-			0,
-			0,
-			newUnregisteredInputChannelMetrics());
+		return InputChannelBuilder.newBuilder()
+			.setChannelIndex(channelIndex)
+			.setConnectionManager(connectionManager)
+			.buildRemoteAndSetToGate(inputGate);
 	}
 
 	public static InputChannelMetrics newUnregisteredInputChannelMetrics() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index f753f0f..95dc380 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 
@@ -63,9 +62,7 @@ public class InputGateConcurrentTest {
 		final SingleInputGate gate = createSingleInputGate(numberOfChannels);
 
 		for (int i = 0; i < numberOfChannels; i++) {
-			LocalInputChannel channel = createLocalInputChannel(gate, i, resultPartitionManager);
-			gate.setInputChannel(channel.getPartitionId().getPartitionId(), channel);
-
+			createLocalInputChannel(gate, i, resultPartitionManager);
 			partitions[i] = new PipelinedSubpartition(0, resultPartition);
 			sources[i] = new PipelinedSubpartitionSource(partitions[i]);
 		}
@@ -92,8 +89,6 @@ public class InputGateConcurrentTest {
 
 		for (int i = 0; i < numberOfChannels; i++) {
 			RemoteInputChannel channel = createRemoteInputChannel(gate, i, connManager);
-			gate.setInputChannel(channel.getPartitionId().getPartitionId(), channel);
-
 			sources[i] = new RemoteChannelSource(channel);
 		}
 
@@ -137,14 +132,11 @@ public class InputGateConcurrentTest {
 				localPartitions[local++] = psp;
 				sources[i] = new PipelinedSubpartitionSource(psp);
 
-				LocalInputChannel channel = createLocalInputChannel(gate, i, resultPartitionManager);
-				gate.setInputChannel(channel.getPartitionId().getPartitionId(), channel);
+				createLocalInputChannel(gate, i, resultPartitionManager);
 			}
 			else {
 				//remote channel
 				RemoteInputChannel channel = createRemoteInputChannel(gate, i, connManager);
-				gate.setInputChannel(channel.getPartitionId().getPartitionId(), channel);
-
 				sources[i] = new RemoteChannelSource(channel);
 			}
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index ada4d7f..6ed5f21 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
-import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
@@ -92,8 +91,7 @@ public class InputGateFairnessTest {
 		final SingleInputGate gate = createFairnessVerifyingInputGate(numberOfChannels);
 
 		for (int i = 0; i < numberOfChannels; i++) {
-			LocalInputChannel channel = createLocalInputChannel(gate, i, resultPartitionManager);
-			gate.setInputChannel(channel.getPartitionId().getPartitionId(), channel);
+			createLocalInputChannel(gate, i, resultPartitionManager);
 		}
 
 		// read all the buffers and the EOF event
@@ -138,8 +136,7 @@ public class InputGateFairnessTest {
 			final SingleInputGate gate = createFairnessVerifyingInputGate(numberOfChannels);
 
 			for (int i = 0; i < numberOfChannels; i++) {
-				LocalInputChannel channel = createLocalInputChannel(gate, i, resultPartitionManager);
-				gate.setInputChannel(channel.getPartitionId().getPartitionId(), channel);
+				createLocalInputChannel(gate, i, resultPartitionManager);
 			}
 
 			// seed one initial buffer
@@ -192,8 +189,6 @@ public class InputGateFairnessTest {
 				channel.onBuffer(mockBuffer, p, -1);
 			}
 			channel.onBuffer(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), buffersPerChannel, -1);
-
-			gate.setInputChannel(channel.getPartitionId().getPartitionId(), channel);
 		}
 
 		// read all the buffers and the EOF event
@@ -234,7 +229,6 @@ public class InputGateFairnessTest {
 		for (int i = 0; i < numberOfChannels; i++) {
 			RemoteInputChannel channel = createRemoteInputChannel(gate, i, connManager);
 			channels[i] = channel;
-			gate.setInputChannel(channel.getPartitionId().getPartitionId(), channel);
 		}
 
 		channels[11].onBuffer(mockBuffer, 0, -1);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
new file mode 100644
index 0000000..549dcc5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Builder for various {@link InputChannel} types.
+ */
+public class InputChannelBuilder {
+	static final ConnectionID STUB_CONNECTION_ID =
+		new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
+
+	private int channelIndex = 0;
+	private ResultPartitionID partitionId = new ResultPartitionID();
+	private ConnectionID connectionID = STUB_CONNECTION_ID;
+	private ResultPartitionManager partitionManager = new ResultPartitionManager();
+	private TaskEventPublisher taskEventPublisher = new TaskEventDispatcher();
+	private ConnectionManager connectionManager = new LocalConnectionManager();
+	private int initialBackoff = 0;
+	private int maxBackoff = 0;
+	private InputChannelMetrics metrics = InputChannelTestUtils.newUnregisteredInputChannelMetrics();
+
+	public static InputChannelBuilder newBuilder() {
+		return new InputChannelBuilder();
+	}
+
+	public InputChannelBuilder setChannelIndex(int channelIndex) {
+		this.channelIndex = channelIndex;
+		return this;
+	}
+
+	public InputChannelBuilder setPartitionId(ResultPartitionID partitionId) {
+		this.partitionId = partitionId;
+		return this;
+	}
+
+	public InputChannelBuilder setPartitionManager(ResultPartitionManager partitionManager) {
+		this.partitionManager = partitionManager;
+		return this;
+	}
+
+	InputChannelBuilder setTaskEventPublisher(TaskEventPublisher taskEventPublisher) {
+		this.taskEventPublisher = taskEventPublisher;
+		return this;
+	}
+
+	public InputChannelBuilder setConnectionManager(ConnectionManager connectionManager) {
+		this.connectionManager = connectionManager;
+		return this;
+	}
+
+	public InputChannelBuilder setInitialBackoff(int initialBackoff) {
+		this.initialBackoff = initialBackoff;
+		return this;
+	}
+
+	public InputChannelBuilder setMaxBackoff(int maxBackoff) {
+		this.maxBackoff = maxBackoff;
+		return this;
+	}
+
+	public InputChannelBuilder setMetrics(InputChannelMetrics metrics) {
+		this.metrics = metrics;
+		return this;
+	}
+
+	InputChannelBuilder setupFromNetworkEnvironment(NetworkEnvironment network) {
+		this.partitionManager = network.getResultPartitionManager();
+		this.connectionManager = network.getConnectionManager();
+		this.initialBackoff = network.getConfiguration().partitionRequestInitialBackoff();
+		this.maxBackoff = network.getConfiguration().partitionRequestMaxBackoff();
+		return this;
+	}
+
+	UnknownInputChannel buildUnknownAndSetToGate(SingleInputGate inputGate) {
+		UnknownInputChannel channel = new UnknownInputChannel(
+			inputGate,
+			channelIndex,
+			partitionId,
+			partitionManager,
+			taskEventPublisher,
+			connectionManager,
+			initialBackoff,
+			maxBackoff,
+			metrics);
+		inputGate.setInputChannel(partitionId.getPartitionId(), channel);
+		return channel;
+	}
+
+	public LocalInputChannel buildLocalAndSetToGate(SingleInputGate inputGate) {
+		LocalInputChannel channel = new LocalInputChannel(
+			inputGate,
+			channelIndex,
+			partitionId,
+			partitionManager,
+			taskEventPublisher,
+			initialBackoff,
+			maxBackoff,
+			metrics);
+		inputGate.setInputChannel(partitionId.getPartitionId(), channel);
+		return channel;
+	}
+
+	public RemoteInputChannel buildRemoteAndSetToGate(SingleInputGate inputGate) {
+		RemoteInputChannel channel = new RemoteInputChannel(
+			inputGate,
+			channelIndex,
+			partitionId,
+			connectionID,
+			connectionManager,
+			initialBackoff,
+			maxBackoff,
+			metrics);
+		inputGate.setInputChannel(partitionId.getPartitionId(), channel);
+		return channel;
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 4654998..0c25b24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
-import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
@@ -36,7 +35,6 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.io.network.util.TestPartitionProducer;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.util.function.CheckedSupplier;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -297,8 +295,6 @@ public class LocalInputChannelTest {
 
 		final LocalInputChannel channel = createLocalInputChannel(gate, partitionManager, 1, 1);
 
-		gate.setInputChannel(new IntermediateResultPartitionID(), channel);
-
 		Thread releaser = new Thread() {
 			@Override
 			public void run() {
@@ -444,15 +440,12 @@ public class LocalInputChannelTest {
 
 			// Setup input channels
 			for (int i = 0; i < numberOfInputChannels; i++) {
-				inputGate.setInputChannel(
-						new IntermediateResultPartitionID(),
-						new LocalInputChannel(
-								inputGate,
-								i,
-								consumedPartitionIds[i],
-								partitionManager,
-								taskEventDispatcher,
-								InputChannelTestUtils.newUnregisteredInputChannelMetrics()));
+				InputChannelBuilder.newBuilder()
+					.setChannelIndex(i)
+					.setPartitionManager(partitionManager)
+					.setPartitionId(consumedPartitionIds[i])
+					.setTaskEventPublisher(taskEventDispatcher)
+					.buildLocalAndSetToGate(inputGate);
 			}
 
 			this.numberOfInputChannels = numberOfInputChannels;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 1e571e7..67bc6f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -47,8 +47,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import scala.Tuple2;
-
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.hasProperty;
@@ -189,28 +187,24 @@ public class RemoteInputChannelTest {
 
 	@Test(expected = IllegalStateException.class)
 	public void testRetriggerWithoutPartitionRequest() throws Exception {
-		Tuple2<Integer, Integer> backoff = new Tuple2<Integer, Integer>(500, 3000);
 		PartitionRequestClient connClient = mock(PartitionRequestClient.class);
 		SingleInputGate inputGate = mock(SingleInputGate.class);
 
-		RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, backoff);
+		RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, 500, 3000);
 
 		ch.retriggerSubpartitionRequest(0);
 	}
 
 	@Test
 	public void testPartitionRequestExponentialBackoff() throws Exception {
-		// Config
-		Tuple2<Integer, Integer> backoff = new Tuple2<Integer, Integer>(500, 3000);
-
 		// Start with initial backoff, then keep doubling, and cap at max.
-		int[] expectedDelays = {backoff._1(), 1000, 2000, backoff._2()};
+		int[] expectedDelays = {500, 1000, 2000, 3000};
 
 		// Setup
 		PartitionRequestClient connClient = mock(PartitionRequestClient.class);
 		SingleInputGate inputGate = mock(SingleInputGate.class);
 
-		RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, backoff);
+		RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, 500, 3000);
 
 		// Initial request
 		ch.requestSubpartition(0);
@@ -235,14 +229,11 @@ public class RemoteInputChannelTest {
 
 	@Test
 	public void testPartitionRequestSingleBackoff() throws Exception {
-		// Config
-		Tuple2<Integer, Integer> backoff = new Tuple2<Integer, Integer>(500, 500);
-
 		// Setup
 		PartitionRequestClient connClient = mock(PartitionRequestClient.class);
 		SingleInputGate inputGate = mock(SingleInputGate.class);
 
-		RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, backoff);
+		RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, 500, 500);
 
 		// No delay for first request
 		ch.requestSubpartition(0);
@@ -250,7 +241,7 @@ public class RemoteInputChannelTest {
 
 		// Initial delay for second request
 		ch.retriggerSubpartitionRequest(0);
-		verify(connClient).requestSubpartition(eq(ch.partitionId), eq(0), eq(ch), eq(backoff._1()));
+		verify(connClient).requestSubpartition(eq(ch.partitionId), eq(0), eq(ch), eq(500));
 
 		// Exception after backoff is greater than the maximum backoff.
 		try {
@@ -264,14 +255,11 @@ public class RemoteInputChannelTest {
 
 	@Test
 	public void testPartitionRequestNoBackoff() throws Exception {
-		// Config
-		Tuple2<Integer, Integer> backoff = new Tuple2<Integer, Integer>(0, 0);
-
 		// Setup
 		PartitionRequestClient connClient = mock(PartitionRequestClient.class);
 		SingleInputGate inputGate = mock(SingleInputGate.class);
 
-		RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, backoff);
+		RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, 0, 0);
 
 		// No delay for first request
 		ch.requestSubpartition(0);
@@ -297,13 +285,10 @@ public class RemoteInputChannelTest {
 
 		final SingleInputGate inputGate = mock(SingleInputGate.class);
 
-		final RemoteInputChannel ch = new RemoteInputChannel(
-				inputGate,
-				0,
-				partitionId,
-				mock(ConnectionID.class),
-				connectionManager,
-				InputChannelTestUtils.newUnregisteredInputChannelMetrics());
+		final RemoteInputChannel ch = InputChannelBuilder.newBuilder()
+			.setPartitionId(partitionId)
+			.setConnectionManager(connectionManager)
+			.buildRemoteAndSetToGate(inputGate);
 
 		ch.onFailedPartitionRequest();
 
@@ -346,7 +331,6 @@ public class RemoteInputChannelTest {
 
 		final SingleInputGate inputGate = createSingleInputGate(1);
 		final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
-		inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
 		Throwable thrown = null;
 		try {
 			final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
@@ -486,7 +470,6 @@ public class RemoteInputChannelTest {
 
 		final SingleInputGate inputGate = createSingleInputGate(1);
 		final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
-		inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
 		Throwable thrown = null;
 		try {
 			final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
@@ -562,7 +545,6 @@ public class RemoteInputChannelTest {
 
 		final SingleInputGate inputGate = createSingleInputGate(1);
 		final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
-		inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
 		Throwable thrown = null;
 		try {
 			final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
@@ -654,9 +636,6 @@ public class RemoteInputChannelTest {
 		final RemoteInputChannel channel1 = spy(createRemoteInputChannel(inputGate));
 		final RemoteInputChannel channel2 = spy(createRemoteInputChannel(inputGate));
 		final RemoteInputChannel channel3 = spy(createRemoteInputChannel(inputGate));
-		inputGate.setInputChannel(channel1.partitionId.getPartitionId(), channel1);
-		inputGate.setInputChannel(channel2.partitionId.getPartitionId(), channel2);
-		inputGate.setInputChannel(channel3.partitionId.getPartitionId(), channel3);
 		Throwable thrown = null;
 		try {
 			final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
@@ -724,14 +703,11 @@ public class RemoteInputChannelTest {
 
 		final SingleInputGate inputGate = createSingleInputGate(1);
 		final RemoteInputChannel successfulRemoteIC = createRemoteInputChannel(inputGate);
-		inputGate.setInputChannel(successfulRemoteIC.partitionId.getPartitionId(), successfulRemoteIC);
-
 		successfulRemoteIC.requestSubpartition(0);
 
 		// late creation -> no exclusive buffers, also no requested subpartition in successfulRemoteIC
 		// (to trigger a failure in RemoteInputChannel#notifyBufferAvailable())
 		final RemoteInputChannel failingRemoteIC = createRemoteInputChannel(inputGate);
-		inputGate.setInputChannel(failingRemoteIC.partitionId.getPartitionId(), failingRemoteIC);
 
 		Buffer buffer = null;
 		Throwable thrown = null;
@@ -789,7 +765,6 @@ public class RemoteInputChannelTest {
 
 		final SingleInputGate inputGate = createSingleInputGate(1);
 		final RemoteInputChannel inputChannel  = createRemoteInputChannel(inputGate);
-		inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
 		Throwable thrown = null;
 		try {
 			final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
@@ -852,7 +827,6 @@ public class RemoteInputChannelTest {
 
 		final SingleInputGate inputGate = createSingleInputGate(1);
 		final RemoteInputChannel inputChannel  = createRemoteInputChannel(inputGate);
-		inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
 		Throwable thrown = null;
 		try {
 			final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
@@ -904,7 +878,6 @@ public class RemoteInputChannelTest {
 
 		final SingleInputGate inputGate = createSingleInputGate(1);
 		final RemoteInputChannel inputChannel  = createRemoteInputChannel(inputGate);
-		inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
 		Throwable thrown = null;
 		try {
 			final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
@@ -959,7 +932,6 @@ public class RemoteInputChannelTest {
 
 		final SingleInputGate inputGate = createSingleInputGate(1);
 		final RemoteInputChannel inputChannel  = createRemoteInputChannel(inputGate);
-		inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
 		Throwable thrown = null;
 		try {
 			final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
@@ -1027,29 +999,25 @@ public class RemoteInputChannelTest {
 	private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate)
 			throws IOException, InterruptedException {
 
-		return createRemoteInputChannel(
-				inputGate, mock(PartitionRequestClient.class), new Tuple2<Integer, Integer>(0, 0));
+		return createRemoteInputChannel(inputGate, mock(PartitionRequestClient.class), 0, 0);
 	}
 
 	private RemoteInputChannel createRemoteInputChannel(
 			SingleInputGate inputGate,
 			PartitionRequestClient partitionRequestClient,
-			Tuple2<Integer, Integer> initialAndMaxRequestBackoff)
+			int initialBackoff,
+			int maxBackoff)
 			throws IOException, InterruptedException {
 
 		final ConnectionManager connectionManager = mock(ConnectionManager.class);
 		when(connectionManager.createPartitionRequestClient(any(ConnectionID.class)))
 				.thenReturn(partitionRequestClient);
 
-		return new RemoteInputChannel(
-			inputGate,
-			0,
-			new ResultPartitionID(),
-			mock(ConnectionID.class),
-			connectionManager,
-			initialAndMaxRequestBackoff._1(),
-			initialAndMaxRequestBackoff._2(),
-			InputChannelTestUtils.newUnregisteredInputChannelMetrics());
+		return InputChannelBuilder.newBuilder()
+			.setConnectionManager(connectionManager)
+			.setInitialBackoff(initialBackoff)
+			.setMaxBackoff(maxBackoff)
+			.buildRemoteAndSetToGate(inputGate);
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 07fcd26..ef09d80 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -26,9 +26,6 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.ConnectionID;
-import org.apache.flink.runtime.io.network.ConnectionManager;
-import org.apache.flink.runtime.io.network.LocalConnectionManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -36,7 +33,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -52,7 +48,6 @@ import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
@@ -179,21 +174,24 @@ public class SingleInputGateTest extends InputGateTestBase {
 
 		inputGate.setBufferPool(bufferPool);
 
-		final InputChannelMetrics metrics = InputChannelTestUtils.newUnregisteredInputChannelMetrics();
-
 		// Local
 		ResultPartitionID localPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
 
-		InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, metrics);
+		InputChannelBuilder.newBuilder()
+			.setPartitionId(localPartitionId)
+			.setPartitionManager(partitionManager)
+			.setTaskEventPublisher(taskEventDispatcher)
+			.buildLocalAndSetToGate(inputGate);
 
 		// Unknown
 		ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
 
-		InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, metrics);
-
-		// Set channels
-		inputGate.setInputChannel(localPartitionId.getPartitionId(), local);
-		inputGate.setInputChannel(unknownPartitionId.getPartitionId(), unknown);
+		InputChannelBuilder.newBuilder()
+			.setChannelIndex(1)
+			.setPartitionId(unknownPartitionId)
+			.setPartitionManager(partitionManager)
+			.setTaskEventPublisher(taskEventDispatcher)
+			.buildUnknownAndSetToGate(inputGate);
 
 		// Request partitions
 		inputGate.requestPartitions();
@@ -227,18 +225,9 @@ public class SingleInputGateTest extends InputGateTestBase {
 
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 
-		InputChannel unknown = new UnknownInputChannel(
-			inputGate,
-			0,
-			new ResultPartitionID(),
-			partitionManager,
-			new TaskEventDispatcher(),
-			new LocalConnectionManager(),
-			0,
-			0,
-			InputChannelTestUtils.newUnregisteredInputChannelMetrics());
-
-		inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
+		InputChannel unknown = InputChannelBuilder.newBuilder()
+			.setPartitionManager(partitionManager)
+			.buildUnknownAndSetToGate(inputGate);
 
 		// Update to a local channel and verify that no request is triggered
 		inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(
@@ -260,18 +249,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 		// Setup the input gate with a single channel that does nothing
 		final SingleInputGate inputGate = createInputGate(1);
 
-		InputChannel unknown = new UnknownInputChannel(
-			inputGate,
-			0,
-			new ResultPartitionID(),
-			new ResultPartitionManager(),
-			new TaskEventDispatcher(),
-			new LocalConnectionManager(),
-			0,
-			0,
-			InputChannelTestUtils.newUnregisteredInputChannelMetrics());
-
-		inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
+		InputChannelBuilder.newBuilder().buildUnknownAndSetToGate(inputGate);
 
 		// Start the consumer in a separate Thread
 		Thread asyncConsumer = new Thread() {
@@ -335,7 +313,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 			// Remote
 			new InputChannelDeploymentDescriptor(
 				partitionIds[1],
-				ResultPartitionLocation.createRemote(new ConnectionID(new InetSocketAddress("localhost", 5000), 0))),
+				ResultPartitionLocation.createRemote(InputChannelBuilder.STUB_CONNECTION_ID)),
 			// Unknown
 			new InputChannelDeploymentDescriptor(
 				partitionIds[2],
@@ -415,16 +393,14 @@ public class SingleInputGateTest extends InputGateTestBase {
 		final NetworkEnvironment network = createNetworkEnvironment();
 
 		try {
-			final ResultPartitionID resultPartitionId = new ResultPartitionID();
-			final ConnectionID connectionId = new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
-			addRemoteInputChannel(network, inputGate, connectionId, resultPartitionId, 0);
-
+			RemoteInputChannel remote =
+				InputChannelBuilder.newBuilder()
+					.setupFromNetworkEnvironment(network)
+					.buildRemoteAndSetToGate(inputGate);
 			network.setupInputGate(inputGate);
 
 			NetworkBufferPool bufferPool = network.getNetworkBufferPool();
 			if (enableCreditBasedFlowControl) {
-				RemoteInputChannel remote = (RemoteInputChannel) inputGate.getInputChannels()
-					.get(resultPartitionId.getPartitionId());
 				// only the exclusive buffers should be assigned/available now
 				assertEquals(buffersPerChannel, remote.getNumberOfAvailableBuffers());
 
@@ -469,10 +445,9 @@ public class SingleInputGateTest extends InputGateTestBase {
 			}
 
 			// Trigger updates to remote input channel from unknown input channel
-			final ConnectionID connectionId = new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
 			inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(
 				resultPartitionId,
-				ResultPartitionLocation.createRemote(connectionId)));
+				ResultPartitionLocation.createRemote(InputChannelBuilder.STUB_CONNECTION_ID)));
 
 			if (enableCreditBasedFlowControl) {
 				RemoteInputChannel remote = (RemoteInputChannel) inputGate.getInputChannels()
@@ -517,10 +492,9 @@ public class SingleInputGateTest extends InputGateTestBase {
 				is(instanceOf((UnknownInputChannel.class))));
 
 			// Trigger updates to remote input channel from unknown input channel
-			final ConnectionID remoteConnectionId = new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
 			inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(
 				remoteResultPartitionId,
-				ResultPartitionLocation.createRemote(remoteConnectionId)));
+				ResultPartitionLocation.createRemote(InputChannelBuilder.STUB_CONNECTION_ID)));
 
 			assertThat(inputGate.getInputChannels().get(remoteResultPartitionId.getPartitionId()),
 				is(instanceOf((RemoteInputChannel.class))));
@@ -549,38 +523,11 @@ public class SingleInputGateTest extends InputGateTestBase {
 			SingleInputGate inputGate,
 			ResultPartitionID partitionId,
 			int channelIndex) {
-		UnknownInputChannel unknown =
-			createUnknownInputChannel(network, inputGate, partitionId, channelIndex);
-		inputGate.setInputChannel(partitionId.getPartitionId(), unknown);
-	}
-
-	private UnknownInputChannel createUnknownInputChannel(
-			NetworkEnvironment network,
-			SingleInputGate inputGate,
-			ResultPartitionID partitionId,
-			int channelIndex) {
-		return new UnknownInputChannel(
-			inputGate,
-			channelIndex,
-			partitionId,
-			network.getResultPartitionManager(),
-			new TaskEventDispatcher(),
-			network.getConnectionManager(),
-			network.getConfiguration().partitionRequestInitialBackoff(),
-			network.getConfiguration().partitionRequestMaxBackoff(),
-			InputChannelTestUtils.newUnregisteredInputChannelMetrics());
-	}
-
-	private void addRemoteInputChannel(
-			NetworkEnvironment network,
-			SingleInputGate inputGate,
-			ConnectionID connectionId,
-			ResultPartitionID partitionId,
-			int channelIndex) {
-		RemoteInputChannel remote =
-			createUnknownInputChannel(network, inputGate, partitionId, channelIndex)
-				.toRemoteInputChannel(connectionId);
-		inputGate.setInputChannel(partitionId.getPartitionId(), remote);
+		InputChannelBuilder.newBuilder()
+			.setChannelIndex(channelIndex)
+			.setPartitionId(partitionId)
+			.setupFromNetworkEnvironment(network)
+			.buildUnknownAndSetToGate(inputGate);
 	}
 
 	private NetworkEnvironment createNetworkEnvironment() {