You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/09 12:46:15 UTC

[flink] 04/05: [FLINK-13013][network] Request partitions during InputGate#setup

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c27d0d68dc76ef4818f12296b28d78d6a78d4965
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Jul 1 11:57:33 2019 +0200

    [FLINK-13013][network] Request partitions during InputGate#setup
    
    Before partitions were being requested on first polling/getting next buffer
    which was causing a couple of issues:
    - it was a little bit confusing
    - after first requestPartitions call, this was causing unnecessary synchronisation overhead
    - this was preventing data notifications to come through and isAvailable() future was always not
      completed before the first attempt to read the data from the input gate
    
    This commit moves requesting partitions to InputGate#setup solving those issues.
---
 .../io/network/partition/consumer/InputGate.java   |  2 +-
 .../consumer/RemoteChannelStateChecker.java        |  9 +-
 .../partition/consumer/SingleInputGate.java        |  5 +-
 .../network/partition/consumer/UnionInputGate.java |  3 -
 .../runtime/taskmanager/InputGateWithMetrics.java  |  2 +-
 .../org/apache/flink/runtime/taskmanager/Task.java |  4 +-
 .../io/network/NettyShuffleEnvironmentTest.java    | 30 +++----
 .../runtime/io/network/buffer/NoOpBufferPool.java  | 95 ++++++++++++++++++++++
 .../network/partition/InputChannelTestUtils.java   | 21 +++++
 .../network/partition/InputGateFairnessTest.java   | 29 ++++++-
 .../io/network/partition/PartitionTestUtils.java   |  2 +
 .../consumer/InputBuffersMetricsTest.java          | 36 ++++++--
 .../partition/consumer/LocalInputChannelTest.java  |  8 +-
 .../partition/consumer/SingleInputGateBuilder.java |  5 ++
 .../partition/consumer/SingleInputGateTest.java    | 23 +++++-
 .../apache/flink/runtime/taskmanager/TaskTest.java | 26 ++++--
 .../StreamNetworkBenchmarkEnvironment.java         |  3 +-
 .../StreamNetworkPointToPointBenchmark.java        |  2 +-
 .../StreamNetworkThroughputBenchmark.java          |  2 +-
 19 files changed, 252 insertions(+), 55 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 0ce446b..e9f2399 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -130,5 +130,5 @@ public abstract class InputGate implements AsyncDataInput<BufferOrEvent>, AutoCl
 	/**
 	 * Setup gate, potentially heavy-weight, blocking operation comparing to just creation.
 	 */
-	public abstract void setup() throws IOException;
+	public abstract void setup() throws IOException, InterruptedException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java
index 69ee3fd..4bcfb4c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java
@@ -46,7 +46,8 @@ public class RemoteChannelStateChecker {
 
 	public boolean isProducerReadyOrAbortConsumption(ResponseHandle responseHandle) {
 		Either<ExecutionState, Throwable> result = responseHandle.getProducerExecutionState();
-		if (responseHandle.getConsumerExecutionState() != ExecutionState.RUNNING) {
+		ExecutionState consumerExecutionState = responseHandle.getConsumerExecutionState();
+		if (!isConsumerStateValidForConsumption(consumerExecutionState)) {
 			LOG.debug(
 				"Ignore a partition producer state notification for task {}, because it's not running.",
 				taskNameWithSubtask);
@@ -64,6 +65,12 @@ public class RemoteChannelStateChecker {
 		return false;
 	}
 
+	private static boolean isConsumerStateValidForConsumption(
+			ExecutionState consumerExecutionState) {
+		return consumerExecutionState == ExecutionState.RUNNING ||
+			consumerExecutionState == ExecutionState.DEPLOYING;
+	}
+
 	private boolean isProducerConsumerReady(ResponseHandle responseHandle) {
 		ExecutionState producerState = getProducerState(responseHandle);
 		return producerState == ExecutionState.SCHEDULED ||
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index b23572d..696dbe8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -202,7 +202,7 @@ public class SingleInputGate extends InputGate {
 	}
 
 	@Override
-	public void setup() throws IOException {
+	public void setup() throws IOException, InterruptedException {
 		checkState(this.bufferPool == null, "Bug in input gate setup logic: Already registered buffer pool.");
 		if (isCreditBased) {
 			// assign exclusive buffers to input channels directly and use the rest for floating buffers
@@ -211,6 +211,8 @@ public class SingleInputGate extends InputGate {
 
 		BufferPool bufferPool = bufferPoolFactory.get();
 		setBufferPool(bufferPool);
+
+		requestPartitions();
 	}
 
 	// ------------------------------------------------------------------------
@@ -481,7 +483,6 @@ public class SingleInputGate extends InputGate {
 			throw new IllegalStateException("Released");
 		}
 
-		requestPartitions();
 		Optional<InputWithData<InputChannel, BufferAndAvailability>> next = waitAndGetNextData(blocking);
 		if (!next.isPresent()) {
 			return Optional.empty();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 65a15ff..2b5b5c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -167,9 +167,6 @@ public class UnionInputGate extends InputGate {
 			return Optional.empty();
 		}
 
-		// Make sure to request the partitions, if they have not been requested before.
-		requestPartitions();
-
 		Optional<InputWithData<InputGate, BufferOrEvent>> next = waitAndGetNextData(blocking);
 		if (!next.isPresent()) {
 			return Optional.empty();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index 669c02e..27d01d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -61,7 +61,7 @@ public class InputGateWithMetrics extends InputGate {
 	}
 
 	@Override
-	public void setup() throws IOException {
+	public void setup() throws IOException, InterruptedException {
 		inputGate.setup();
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 62886a5..4355821 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -834,12 +834,14 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 
 	@VisibleForTesting
 	public static void setupPartitionsAndGates(
-		ResultPartitionWriter[] producedPartitions, InputGate[] inputGates) throws IOException {
+		ResultPartitionWriter[] producedPartitions, InputGate[] inputGates) throws IOException, InterruptedException {
 
 		for (ResultPartitionWriter partition : producedPartitions) {
 			partition.setup();
 		}
 
+		// InputGates must be initialized after the partitions, since during InputGate#setup
+		// we are requesting partitions
 		for (InputGate gate : inputGates) {
 			gate.setup();
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index ab847ad..ba70a47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -155,23 +155,19 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 		SingleInputGate ig4 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 4);
 		final SingleInputGate[] inputGates = new SingleInputGate[] {ig1, ig2, ig3, ig4};
 
-		// set up remote input channels for the exclusive buffers of the credit-based flow control
-		// (note that this does not obey the partition types which is ok for the scope of the test)
-		if (enableCreditBasedFlowControl) {
-			createRemoteInputChannel(ig4, 0, rp1, connManager, network.getNetworkBufferPool());
-			createRemoteInputChannel(ig4, 0, rp2, connManager, network.getNetworkBufferPool());
-			createRemoteInputChannel(ig4, 0, rp3, connManager, network.getNetworkBufferPool());
-			createRemoteInputChannel(ig4, 0, rp4, connManager, network.getNetworkBufferPool());
-
-			createRemoteInputChannel(ig1, 1, rp1, connManager, network.getNetworkBufferPool());
-			createRemoteInputChannel(ig1, 1, rp4, connManager, network.getNetworkBufferPool());
-
-			createRemoteInputChannel(ig2, 1, rp2, connManager, network.getNetworkBufferPool());
-			createRemoteInputChannel(ig2, 2, rp4, connManager, network.getNetworkBufferPool());
-
-			createRemoteInputChannel(ig3, 1, rp3, connManager, network.getNetworkBufferPool());
-			createRemoteInputChannel(ig3, 3, rp4, connManager, network.getNetworkBufferPool());
-		}
+		createRemoteInputChannel(ig4, 0, rp1, connManager, network.getNetworkBufferPool());
+		createRemoteInputChannel(ig4, 0, rp2, connManager, network.getNetworkBufferPool());
+		createRemoteInputChannel(ig4, 0, rp3, connManager, network.getNetworkBufferPool());
+		createRemoteInputChannel(ig4, 0, rp4, connManager, network.getNetworkBufferPool());
+
+		createRemoteInputChannel(ig1, 1, rp1, connManager, network.getNetworkBufferPool());
+		createRemoteInputChannel(ig1, 1, rp4, connManager, network.getNetworkBufferPool());
+
+		createRemoteInputChannel(ig2, 1, rp2, connManager, network.getNetworkBufferPool());
+		createRemoteInputChannel(ig2, 2, rp4, connManager, network.getNetworkBufferPool());
+
+		createRemoteInputChannel(ig3, 1, rp3, connManager, network.getNetworkBufferPool());
+		createRemoteInputChannel(ig3, 3, rp4, connManager, network.getNetworkBufferPool());
 
 		Task.setupPartitionsAndGates(resultPartitions, inputGates);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
new file mode 100644
index 0000000..04c9c04
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
@@ -0,0 +1,95 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+// We have it in this package because we could not mock the methods otherwise
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+
+/**
+ * No-op implementation of {@link BufferPool}.
+ */
+public class NoOpBufferPool implements BufferPool {
+
+	@Override
+	public void lazyDestroy() {
+	}
+
+	@Override
+	public Buffer requestBuffer() throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Buffer requestBufferBlocking() throws IOException, InterruptedException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean addBufferListener(BufferListener listener) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean isDestroyed() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public int getNumberOfRequiredMemorySegments() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public int getMaxNumberOfMemorySegments() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public int getNumBuffers() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setNumBuffers(int numBuffers) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public int getNumberOfAvailableMemorySegments() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public int bestEffortGetNumOfUsedBuffers() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void recycle(MemorySegment memorySegment) {
+		throw new UnsupportedOperationException();
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index 16d6cab..7805ef8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemorySegmentProvider;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -195,4 +196,24 @@ public class InputChannelTestUtils {
 		public void recycleMemorySegments(Collection<MemorySegment> segments) {
 		}
 	}
+
+	/**
+	 * {@link MemorySegmentProvider} that provides unpooled {@link MemorySegment}s.
+	 */
+	public static class UnpooledMemorySegmentProvider implements MemorySegmentProvider {
+		private final int pageSize;
+
+		public UnpooledMemorySegmentProvider(int pageSize) {
+			this.pageSize = pageSize;
+		}
+
+		@Override
+		public Collection<MemorySegment> requestMemorySegments() {
+			return Collections.singletonList(MemorySegmentFactory.allocateUnpooledSegment(pageSize));
+		}
+
+		@Override
+		public void recycleMemorySegments(Collection<MemorySegment> segments) {
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index 2bf5a09..da05f83 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -24,8 +24,11 @@ import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NoOpBufferPool;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.UnpooledMemorySegmentProvider;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
@@ -47,7 +50,6 @@ import java.util.Optional;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createLocalInputChannel;
-import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel;
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createResultPartitionManager;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -93,6 +95,8 @@ public class InputGateFairnessTest {
 			createLocalInputChannel(gate, i, resultPartitionManager);
 		}
 
+		gate.setup();
+
 		// read all the buffers and the EOF event
 		for (int i = numberOfChannels * (buffersPerChannel + 1); i > 0; --i) {
 			assertNotNull(gate.getNext());
@@ -141,6 +145,8 @@ public class InputGateFairnessTest {
 			// seed one initial buffer
 			sources[12].add(bufferConsumer.copy());
 
+			gate.setup();
+
 			// read all the buffers and the EOF event
 			for (int i = 0; i < numberOfChannels * buffersPerChannel; i++) {
 				assertNotNull(gate.getNext());
@@ -190,6 +196,8 @@ public class InputGateFairnessTest {
 			channel.onBuffer(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), buffersPerChannel, -1);
 		}
 
+		gate.setup();
+
 		// read all the buffers and the EOF event
 		for (int i = numberOfChannels * (buffersPerChannel + 1); i > 0; --i) {
 			assertNotNull(gate.getNext());
@@ -233,6 +241,8 @@ public class InputGateFairnessTest {
 		channels[11].onBuffer(mockBuffer, 0, -1);
 		channelSequenceNums[11]++;
 
+		gate.setup();
+
 		// read all the buffers and the EOF event
 		for (int i = 0; i < numberOfChannels * buffersPerChannel; i++) {
 			assertNotNull(gate.getNext());
@@ -308,9 +318,8 @@ public class InputGateFairnessTest {
 	// ------------------------------------------------------------------------
 
 	private static class FairnessVerifyingInputGate extends SingleInputGate {
-		private static final SupplierWithException<BufferPool, IOException> STUB_BUFFER_POOL_FACTORY = () -> {
-			throw new UnsupportedOperationException();
-		};
+		private static final SupplierWithException<BufferPool, IOException> STUB_BUFFER_POOL_FACTORY =
+			NoOpBufferPool::new;
 
 		private final ArrayDeque<InputChannel> channelsWithData;
 
@@ -368,4 +377,16 @@ public class InputGateFairnessTest {
 			uniquenessChecker.clear();
 		}
 	}
+
+	public static RemoteInputChannel createRemoteInputChannel(
+		SingleInputGate inputGate,
+		int channelIndex,
+		ConnectionManager connectionManager) {
+
+		return InputChannelBuilder.newBuilder()
+			.setChannelIndex(channelIndex)
+			.setConnectionManager(connectionManager)
+			.setMemorySegmentProvider(new UnpooledMemorySegmentProvider(32 * 1024))
+			.buildRemoteAndSetToGate(inputGate);
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index e559659..5e39a43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -63,6 +63,7 @@ public class PartitionTestUtils {
 			ResultPartitionType partitionType,
 			int numChannels) {
 		return new ResultPartitionBuilder()
+			.setResultPartitionManager(environment.getResultPartitionManager())
 			.setupBufferPoolFactoryFromNettyShuffleEnvironment(environment)
 			.setResultPartitionType(partitionType)
 			.setNumberOfSubpartitions(numChannels)
@@ -75,6 +76,7 @@ public class PartitionTestUtils {
 			ResultPartitionType partitionType,
 			int numChannels) {
 		return new ResultPartitionBuilder()
+			.setResultPartitionManager(environment.getResultPartitionManager())
 			.setupBufferPoolFactoryFromNettyShuffleEnvironment(environment)
 			.setFileChannelManager(channelManager)
 			.setResultPartitionType(partitionType)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
index 8d868cc..9b17a2e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
 import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
 import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.util.TestLogger;
 
@@ -58,7 +60,7 @@ public class InputBuffersMetricsTest extends TestLogger {
 	}
 
 	@Test
-	public void testCalculateTotalBuffersSize() throws IOException {
+	public void testCalculateTotalBuffersSize() throws Exception {
 		int numberOfRemoteChannels = 2;
 		int numberOfLocalChannels = 0;
 
@@ -76,6 +78,7 @@ public class InputBuffersMetricsTest extends TestLogger {
 			numberOfRemoteChannels,
 			numberOfLocalChannels).f0;
 		closeableRegistry.registerCloseable(inputGate1::close);
+		inputGate1.setup();
 
 		SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1};
 		FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates);
@@ -94,7 +97,7 @@ public class InputBuffersMetricsTest extends TestLogger {
 	}
 
 	@Test
-	public void testExclusiveBuffersUsage() throws IOException {
+	public void testExclusiveBuffersUsage() throws Exception {
 		int numberOfRemoteChannelsGate1 = 2;
 		int numberOfLocalChannelsGate1 = 0;
 		int numberOfRemoteChannelsGate2 = 1;
@@ -124,6 +127,8 @@ public class InputBuffersMetricsTest extends TestLogger {
 		SingleInputGate inputGate2 = tuple2.f0;
 		closeableRegistry.registerCloseable(inputGate1::close);
 		closeableRegistry.registerCloseable(inputGate2::close);
+		inputGate1.setup();
+		inputGate2.setup();
 
 		List<RemoteInputChannel> remoteInputChannels = tuple1.f1;
 
@@ -155,7 +160,7 @@ public class InputBuffersMetricsTest extends TestLogger {
 	}
 
 	@Test
-	public void testFloatingBuffersUsage() throws IOException, InterruptedException {
+	public void testFloatingBuffersUsage() throws Exception {
 
 		int numberOfRemoteChannelsGate1 = 2;
 		int numberOfLocalChannelsGate1 = 0;
@@ -185,6 +190,8 @@ public class InputBuffersMetricsTest extends TestLogger {
 		SingleInputGate inputGate1 = tuple1.f0;
 		closeableRegistry.registerCloseable(inputGate1::close);
 		closeableRegistry.registerCloseable(inputGate2::close);
+		inputGate1.setup();
+		inputGate2.setup();
 
 		RemoteInputChannel remoteInputChannel1 = tuple1.f1.get(0);
 
@@ -250,7 +257,7 @@ public class InputBuffersMetricsTest extends TestLogger {
 	private Tuple2<SingleInputGate, List<RemoteInputChannel>> buildInputGate(
 		NettyShuffleEnvironment network,
 		int numberOfRemoteChannels,
-		int numberOfLocalChannels) throws IOException {
+		int numberOfLocalChannels) throws Exception {
 
 		SingleInputGate inputGate = new SingleInputGateBuilder()
 			.setNumberOfChannels(numberOfRemoteChannels + numberOfLocalChannels)
@@ -262,22 +269,31 @@ public class InputBuffersMetricsTest extends TestLogger {
 
 		int channelIdx = 0;
 		for (int i = 0; i < numberOfRemoteChannels; i++) {
-			res.f1.add(buildRemoteChannel(channelIdx, inputGate, network));
+			ResultPartition partition = PartitionTestUtils.createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 1);
+			closeableRegistry.registerCloseable(partition::close);
+			partition.setup();
+
+			res.f1.add(buildRemoteChannel(channelIdx, inputGate, network, partition));
 			channelIdx++;
 		}
 
 		for (int i = 0; i < numberOfLocalChannels; i++) {
-			buildLocalChannel(channelIdx, inputGate, network);
+			ResultPartition partition = PartitionTestUtils.createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 1);
+			closeableRegistry.registerCloseable(partition::close);
+			partition.setup();
+
+			buildLocalChannel(channelIdx, inputGate, network, partition);
 		}
-		inputGate.setup();
 		return res;
 	}
 
 	private RemoteInputChannel buildRemoteChannel(
 		int channelIndex,
 		SingleInputGate inputGate,
-		NettyShuffleEnvironment network) {
+		NettyShuffleEnvironment network,
+		ResultPartition partition) {
 		return new InputChannelBuilder()
+			.setPartitionId(partition.getPartitionId())
 			.setChannelIndex(channelIndex)
 			.setupFromNettyShuffleEnvironment(network)
 			.setConnectionManager(new TestingConnectionManager())
@@ -287,8 +303,10 @@ public class InputBuffersMetricsTest extends TestLogger {
 	private void buildLocalChannel(
 		int channelIndex,
 		SingleInputGate inputGate,
-		NettyShuffleEnvironment network) {
+		NettyShuffleEnvironment network,
+		ResultPartition partition) {
 		new InputChannelBuilder()
+			.setPartitionId(partition.getPartitionId())
 			.setChannelIndex(channelIndex)
 			.setupFromNettyShuffleEnvironment(network)
 			.setConnectionManager(new TestingConnectionManager())
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 72404ce..fd7cdd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -493,7 +493,7 @@ public class LocalInputChannelTest {
 				BufferPool bufferPool,
 				ResultPartitionManager partitionManager,
 				TaskEventDispatcher taskEventDispatcher,
-				ResultPartitionID[] consumedPartitionIds) {
+				ResultPartitionID[] consumedPartitionIds) throws IOException, InterruptedException {
 
 			checkArgument(numberOfInputChannels >= 1);
 			checkArgument(numberOfExpectedBuffersPerChannel >= 1);
@@ -501,11 +501,9 @@ public class LocalInputChannelTest {
 			this.inputGate = new SingleInputGateBuilder()
 				.setConsumedSubpartitionIndex(subpartitionIndex)
 				.setNumberOfChannels(numberOfInputChannels)
+				.setBufferPoolFactory(bufferPool)
 				.build();
 
-			// Set buffer pool
-			inputGate.setBufferPool(bufferPool);
-
 			// Setup input channels
 			for (int i = 0; i < numberOfInputChannels; i++) {
 				InputChannelBuilder.newBuilder()
@@ -516,6 +514,8 @@ public class LocalInputChannelTest {
 					.buildLocalAndSetToGate(inputGate);
 			}
 
+			inputGate.setup();
+
 			this.numberOfInputChannels = numberOfInputChannels;
 			this.numberOfExpectedBuffersPerChannel = numberOfExpectedBuffersPerChannel;
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index b23af06..956bad9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -87,6 +87,11 @@ public class SingleInputGateBuilder {
 		return this;
 	}
 
+	public SingleInputGateBuilder setBufferPoolFactory(BufferPool bufferPool) {
+		this.bufferPoolFactory = () -> bufferPool;
+		return this;
+	}
+
 	public SingleInputGate build() {
 		return new SingleInputGate(
 			"Single Input Gate",
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index d737949..cad957f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
@@ -34,6 +35,8 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -419,6 +422,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 			RemoteInputChannel remote =
 				InputChannelBuilder.newBuilder()
 					.setupFromNettyShuffleEnvironment(network)
+					.setConnectionManager(new TestingConnectionManager())
 					.buildRemoteAndSetToGate(inputGate);
 			inputGate.setup();
 
@@ -498,13 +502,27 @@ public class SingleInputGateTest extends InputGateTestBase {
 	@Test
 	public void testUpdateUnknownInputChannel() throws Exception {
 		final NettyShuffleEnvironment network = createNettyShuffleEnvironment();
+
+		final ResultPartition localResultPartition = new ResultPartitionBuilder()
+			.setResultPartitionManager(network.getResultPartitionManager())
+			.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+			.build();
+
+		final ResultPartition remoteResultPartition = new ResultPartitionBuilder()
+			.setResultPartitionManager(network.getResultPartitionManager())
+			.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+			.build();
+
+		localResultPartition.setup();
+		remoteResultPartition.setup();
+
 		final SingleInputGate inputGate = createInputGate(network, 2, ResultPartitionType.PIPELINED);
 
 		try {
-			final ResultPartitionID localResultPartitionId = new ResultPartitionID();
+			final ResultPartitionID localResultPartitionId = localResultPartition.getPartitionId();
 			addUnknownInputChannel(network, inputGate, localResultPartitionId, 0);
 
-			final ResultPartitionID remoteResultPartitionId = new ResultPartitionID();
+			final ResultPartitionID remoteResultPartitionId = remoteResultPartition.getPartitionId();
 			addUnknownInputChannel(network, inputGate, remoteResultPartitionId, 1);
 
 			inputGate.setup();
@@ -628,6 +646,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 			.setChannelIndex(channelIndex)
 			.setPartitionId(partitionId)
 			.setupFromNettyShuffleEnvironment(network)
+			.setConnectionManager(new TestingConnectionManager())
 			.buildUnknownAndSetToGate(inputGate);
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 46af3e5..ee78963 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -565,7 +565,21 @@ public class TaskTest extends TestLogger {
 	}
 
 	@Test
-	public void testOnPartitionStateUpdate() throws Exception {
+	public void testOnPartitionStateUpdateWhileRunning() throws Exception {
+		testOnPartitionStateUpdate(ExecutionState.RUNNING);
+	}
+
+	/**
+	 * Partition state updates can also happen when {@link Task} is in
+	 * {@link ExecutionState#DEPLOYING} state, because we are requesting for partitions during
+	 * setting up input gates.
+	 */
+	@Test
+	public void testOnPartitionStateUpdateWhileDeploying() throws Exception {
+		testOnPartitionStateUpdate(ExecutionState.DEPLOYING);
+	}
+
+	public void testOnPartitionStateUpdate(ExecutionState initialTaskState) throws Exception {
 		final ResultPartitionID partitionId = new ResultPartitionID();
 
 		final Task task = createTaskBuilder()
@@ -583,10 +597,10 @@ public class TaskTest extends TestLogger {
 			expected.put(state, ExecutionState.FAILED);
 		}
 
-		expected.put(ExecutionState.RUNNING, ExecutionState.RUNNING);
-		expected.put(ExecutionState.SCHEDULED, ExecutionState.RUNNING);
-		expected.put(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
-		expected.put(ExecutionState.FINISHED, ExecutionState.RUNNING);
+		expected.put(ExecutionState.RUNNING, initialTaskState);
+		expected.put(ExecutionState.SCHEDULED, initialTaskState);
+		expected.put(ExecutionState.DEPLOYING, initialTaskState);
+		expected.put(ExecutionState.FINISHED, initialTaskState);
 
 		expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING);
 		expected.put(ExecutionState.CANCELING, ExecutionState.CANCELING);
@@ -594,7 +608,7 @@ public class TaskTest extends TestLogger {
 
 		int producingStateCounter = 0;
 		for (ExecutionState state : ExecutionState.values()) {
-			setState(task, ExecutionState.RUNNING);
+			setState(task, initialTaskState);
 
 			if (checker.isProducerReadyOrAbortConsumption(task.new PartitionProducerStateResponseHandle(state, null))) {
 				producingStateCounter++;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 0cdc658..8351b3c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -50,7 +50,6 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Arrays;
@@ -231,7 +230,7 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 		return consumableNotifyingPartitionWriter;
 	}
 
-	private InputGate createInputGate(TaskManagerLocation senderLocation) throws IOException {
+	private InputGate createInputGate(TaskManagerLocation senderLocation) throws Exception {
 		InputGate[] gates = new InputGate[channels];
 		for (int channel = 0; channel < channels; ++channel) {
 			final InputGateDeploymentDescriptor gateDescriptor = createInputGateDeploymentDescriptor(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index 7488688..bb6b9e2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -77,8 +77,8 @@ public class StreamNetworkPointToPointBenchmark {
 		environment = new StreamNetworkBenchmarkEnvironment<>();
 		environment.setUp(1, 1, false, false, -1, -1, config);
 
-		receiver = environment.createReceiver();
 		recordWriter = environment.createRecordWriter(0, flushTimeout);
+		receiver = environment.createReceiver();
 	}
 
 	/**
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
index 0586f54..b5d1c07 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -110,7 +110,6 @@ public class StreamNetworkThroughputBenchmark {
 			senderBufferPoolSize,
 			receiverBufferPoolSize,
 			config);
-		receiver = environment.createReceiver();
 		writerThreads = new LongRecordWriterThread[recordWriters];
 		for (int writer = 0; writer < recordWriters; writer++) {
 			writerThreads[writer] = new LongRecordWriterThread(
@@ -118,6 +117,7 @@ public class StreamNetworkThroughputBenchmark {
 				broadcastMode);
 			writerThreads[writer].start();
 		}
+		receiver = environment.createReceiver();
 	}
 
 	/**