You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/09 12:46:11 UTC

[flink] branch master updated (fd75bcd -> 8d277d4)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from fd75bcd  [hotfix][tests][coordination] Move idle task manager release tests into a separate suite
     new a68dc3c  [hotfix][test] Guarantee order of CloseableRegistry
     new 479b689  [hotfix][test] Move CloseableRegistry as field in InputBuffersMetricsTest
     new 282646e  [hotfix][test] Deduplicate NettyShuffleEnvironmentTest code
     new c27d0d6  [FLINK-13013][network] Request partitions during InputGate#setup
     new 8d277d4  [hotfix][network] Make InputGate#requestPartitions a private method

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/core/fs/CloseableRegistry.java    |  16 ++-
 .../flink/util/AbstractCloseableRegistry.java      |   8 +-
 .../io/network/partition/consumer/InputGate.java   |   4 +-
 .../consumer/RemoteChannelStateChecker.java        |   9 +-
 .../partition/consumer/SingleInputGate.java        |  55 ++++----
 .../network/partition/consumer/UnionInputGate.java |  17 ---
 .../runtime/taskmanager/InputGateWithMetrics.java  |   7 +-
 .../org/apache/flink/runtime/taskmanager/Task.java |   4 +-
 .../io/network/NettyShuffleEnvironmentBuilder.java |   4 +-
 .../io/network/NettyShuffleEnvironmentTest.java    |  91 +++----------
 .../runtime/io/network/buffer/NoOpBufferPool.java  |  95 +++++++++++++
 .../network/partition/InputChannelTestUtils.java   |  21 +++
 .../network/partition/InputGateFairnessTest.java   |  29 +++-
 .../io/network/partition/PartitionTestUtils.java   |   2 +
 .../consumer/InputBuffersMetricsTest.java          | 149 ++++++++++++---------
 .../partition/consumer/LocalInputChannelTest.java  |   8 +-
 .../partition/consumer/SingleInputGateBuilder.java |   5 +
 .../partition/consumer/SingleInputGateTest.java    |  95 +++++++------
 .../apache/flink/runtime/taskmanager/TaskTest.java |  26 +++-
 .../CheckpointBarrierAlignerMassiveRandomTest.java |   3 -
 .../flink/streaming/runtime/io/MockInputGate.java  |   4 -
 .../StreamNetworkBenchmarkEnvironment.java         |   3 +-
 .../StreamNetworkPointToPointBenchmark.java        |   2 +-
 .../StreamNetworkThroughputBenchmark.java          |   2 +-
 24 files changed, 400 insertions(+), 259 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java


[flink] 05/05: [hotfix][network] Make InputGate#requestPartitions a private method

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8d277d42269fbc8bc02b69c7b5dfe1c51916ea89
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Jul 1 12:14:58 2019 +0200

    [hotfix][network] Make InputGate#requestPartitions a private method
---
 .../io/network/partition/consumer/InputGate.java   |  2 -
 .../partition/consumer/SingleInputGate.java        | 50 +++++++--------
 .../network/partition/consumer/UnionInputGate.java | 14 -----
 .../runtime/taskmanager/InputGateWithMetrics.java  |  5 --
 .../partition/consumer/SingleInputGateTest.java    | 72 +++++++++++-----------
 .../CheckpointBarrierAlignerMassiveRandomTest.java |  3 -
 .../flink/streaming/runtime/io/MockInputGate.java  |  4 --
 7 files changed, 62 insertions(+), 88 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index e9f2399..7d6ea81 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -77,8 +77,6 @@ public abstract class InputGate implements AsyncDataInput<BufferOrEvent>, AutoCl
 
 	public abstract boolean isFinished();
 
-	public abstract void requestPartitions() throws IOException, InterruptedException;
-
 	/**
 	 * Blocking call waiting for next {@link BufferOrEvent}.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 696dbe8..bd75262 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -215,6 +215,32 @@ public class SingleInputGate extends InputGate {
 		requestPartitions();
 	}
 
+	private void requestPartitions() throws IOException, InterruptedException {
+		synchronized (requestLock) {
+			if (!requestedPartitionsFlag) {
+				if (closeFuture.isDone()) {
+					throw new IllegalStateException("Already released.");
+				}
+
+				// Sanity checks
+				if (numberOfInputChannels != inputChannels.size()) {
+					throw new IllegalStateException(String.format(
+						"Bug in input gate setup logic: mismatch between " +
+						"number of total input channels [%s] and the currently set number of input " +
+						"channels [%s].",
+						inputChannels.size(),
+						numberOfInputChannels));
+				}
+
+				for (InputChannel inputChannel : inputChannels.values()) {
+					inputChannel.requestSubpartition(consumedSubpartitionIndex);
+				}
+			}
+
+			requestedPartitionsFlag = true;
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	// Properties
 	// ------------------------------------------------------------------------
@@ -436,30 +462,6 @@ public class SingleInputGate extends InputGate {
 		return hasReceivedAllEndOfPartitionEvents;
 	}
 
-	@Override
-	public void requestPartitions() throws IOException, InterruptedException {
-		synchronized (requestLock) {
-			if (!requestedPartitionsFlag) {
-				if (closeFuture.isDone()) {
-					throw new IllegalStateException("Already released.");
-				}
-
-				// Sanity checks
-				if (numberOfInputChannels != inputChannels.size()) {
-					throw new IllegalStateException("Bug in input gate setup logic: mismatch between " +
-							"number of total input channels and the currently set number of input " +
-							"channels.");
-				}
-
-				for (InputChannel inputChannel : inputChannels.values()) {
-					inputChannel.requestSubpartition(consumedSubpartitionIndex);
-				}
-			}
-
-			requestedPartitionsFlag = true;
-		}
-	}
-
 	// ------------------------------------------------------------------------
 	// Consume
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 2b5b5c1..c612044 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -86,9 +86,6 @@ public class UnionInputGate extends InputGate {
 	 */
 	private final Map<InputGate, Integer> inputGateToIndexOffsetMap;
 
-	/** Flag indicating whether partitions have been requested. */
-	private boolean requestedPartitionsFlag;
-
 	public UnionInputGate(InputGate... inputGates) {
 		this.inputGates = checkNotNull(inputGates);
 		checkArgument(inputGates.length > 1, "Union input gate should union at least two input gates.");
@@ -142,17 +139,6 @@ public class UnionInputGate extends InputGate {
 	}
 
 	@Override
-	public void requestPartitions() throws IOException, InterruptedException {
-		if (!requestedPartitionsFlag) {
-			for (InputGate inputGate : inputGates) {
-				inputGate.requestPartitions();
-			}
-
-			requestedPartitionsFlag = true;
-		}
-	}
-
-	@Override
 	public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
 		return getNextBufferOrEvent(true);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index 27d01d5..5d2cfd9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -66,11 +66,6 @@ public class InputGateWithMetrics extends InputGate {
 	}
 
 	@Override
-	public void requestPartitions() throws IOException, InterruptedException {
-		inputGate.requestPartitions();
-	}
-
-	@Override
 	public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
 		return updateMetrics(inputGate.getNext());
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index cad957f..20eae98 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.TestingConnectionManager;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
@@ -192,51 +191,52 @@ public class SingleInputGateTest extends InputGateTestBase {
 
 		// Setup reader with one local and one unknown input channel
 
-		final SingleInputGate inputGate = createInputGate();
-		final BufferPool bufferPool = mock(BufferPool.class);
-		when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
-
-		inputGate.setBufferPool(bufferPool);
-
-		// Local
-		ResultPartitionID localPartitionId = new ResultPartitionID();
+		NettyShuffleEnvironment environment = createNettyShuffleEnvironment();
+		final SingleInputGate inputGate = createInputGate(environment, 2, ResultPartitionType.PIPELINED);
+		try {
+			// Local
+			ResultPartitionID localPartitionId = new ResultPartitionID();
 
-		InputChannelBuilder.newBuilder()
-			.setPartitionId(localPartitionId)
-			.setPartitionManager(partitionManager)
-			.setTaskEventPublisher(taskEventDispatcher)
-			.buildLocalAndSetToGate(inputGate);
+			InputChannelBuilder.newBuilder()
+				.setPartitionId(localPartitionId)
+				.setPartitionManager(partitionManager)
+				.setTaskEventPublisher(taskEventDispatcher)
+				.buildLocalAndSetToGate(inputGate);
 
-		// Unknown
-		ResultPartitionID unknownPartitionId = new ResultPartitionID();
+			// Unknown
+			ResultPartitionID unknownPartitionId = new ResultPartitionID();
 
-		InputChannelBuilder.newBuilder()
-			.setChannelIndex(1)
-			.setPartitionId(unknownPartitionId)
-			.setPartitionManager(partitionManager)
-			.setTaskEventPublisher(taskEventDispatcher)
-			.buildUnknownAndSetToGate(inputGate);
+			InputChannelBuilder.newBuilder()
+				.setChannelIndex(1)
+				.setPartitionId(unknownPartitionId)
+				.setPartitionManager(partitionManager)
+				.setTaskEventPublisher(taskEventDispatcher)
+				.buildUnknownAndSetToGate(inputGate);
 
-		// Request partitions
-		inputGate.requestPartitions();
+			inputGate.setup();
 
-		// Only the local channel can request
-		verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class));
+			// Only the local channel can request
+			verify(partitionManager, times(1)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class));
 
-		// Send event backwards and initialize unknown channel afterwards
-		final TaskEvent event = new TestTaskEvent();
-		inputGate.sendTaskEvent(event);
+			// Send event backwards and initialize unknown channel afterwards
+			final TaskEvent event = new TestTaskEvent();
+			inputGate.sendTaskEvent(event);
 
-		// Only the local channel can send out the event
-		verify(taskEventDispatcher, times(1)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
+			// Only the local channel can send out the event
+			verify(taskEventDispatcher, times(1)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
 
-		// After the update, the pending event should be send to local channel
+			// After the update, the pending event should be send to local channel
 
-		ResourceID location = ResourceID.generate();
-		inputGate.updateInputChannel(location, createRemoteWithIdAndLocation(unknownPartitionId.getPartitionId(), location));
+			ResourceID location = ResourceID.generate();
+			inputGate.updateInputChannel(location, createRemoteWithIdAndLocation(unknownPartitionId.getPartitionId(), location));
 
-		verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class));
-		verify(taskEventDispatcher, times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
+			verify(partitionManager, times(2)).createSubpartitionView(any(ResultPartitionID.class), anyInt(), any(BufferAvailabilityListener.class));
+			verify(taskEventDispatcher, times(2)).publish(any(ResultPartitionID.class), any(TaskEvent.class));
+		}
+		finally {
+			inputGate.close();
+			environment.close();
+		}
 	}
 
 	/**
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
index 552818d..182afef 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAlignerMassiveRandomTest.java
@@ -152,9 +152,6 @@ public class CheckpointBarrierAlignerMassiveRandomTest {
 		}
 
 		@Override
-		public void requestPartitions() {}
-
-		@Override
 		public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
 			currentChannel = (currentChannel + 1) % numberOfChannels;
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
index 8cb6848..7cc7194 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
@@ -98,10 +98,6 @@ public class MockInputGate extends InputGate {
 	}
 
 	@Override
-	public void requestPartitions() {
-	}
-
-	@Override
 	public void sendTaskEvent(TaskEvent event) {
 	}
 


[flink] 02/05: [hotfix][test] Move CloseableRegistry as field in InputBuffersMetricsTest

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 479b6890bf1e1e8f4634903975b4983c96913dc2
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Jul 5 16:33:10 2019 +0200

    [hotfix][test] Move CloseableRegistry as field in InputBuffersMetricsTest
---
 .../consumer/InputBuffersMetricsTest.java          | 113 +++++++++++----------
 1 file changed, 59 insertions(+), 54 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
index a602648..8d868cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -43,6 +45,18 @@ import static org.junit.Assert.assertEquals;
  */
 public class InputBuffersMetricsTest extends TestLogger {
 
+	private CloseableRegistry closeableRegistry;
+
+	@Before
+	public void setup() {
+		closeableRegistry = new CloseableRegistry();
+	}
+
+	@After
+	public void tearDown() throws IOException {
+		closeableRegistry.close();
+	}
+
 	@Test
 	public void testCalculateTotalBuffersSize() throws IOException {
 		int numberOfRemoteChannels = 2;
@@ -55,11 +69,13 @@ public class InputBuffersMetricsTest extends TestLogger {
 			.setNetworkBuffersPerChannel(numberOfBufferPerChannel)
 			.setFloatingNetworkBuffersPerGate(numberOfBuffersPerGate)
 			.build();
+		closeableRegistry.registerCloseable(network::close);
 
 		SingleInputGate inputGate1 = buildInputGate(
 			network,
 			numberOfRemoteChannels,
 			numberOfLocalChannels).f0;
+		closeableRegistry.registerCloseable(inputGate1::close);
 
 		SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1};
 		FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates);
@@ -69,15 +85,12 @@ public class InputBuffersMetricsTest extends TestLogger {
 			exclusiveBuffersUsageGauge,
 			inputGates);
 
-		try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
+		closeableRegistry.registerCloseable(network::close);
+		closeableRegistry.registerCloseable(inputGate1::close);
 
-			closeableRegistry.registerCloseable(network::close);
-			closeableRegistry.registerCloseable(inputGate1::close);
-
-			assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
-			assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
-			assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
-		}
+		assertEquals(numberOfBuffersPerGate, floatingBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+		assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel, exclusiveBuffersUsageGauge.calculateTotalBuffers(inputGate1));
+		assertEquals(numberOfRemoteChannels * numberOfBufferPerChannel + numberOfBuffersPerGate, inputBufferPoolUsageGauge.calculateTotalBuffers(inputGate1));
 	}
 
 	@Test
@@ -96,6 +109,7 @@ public class InputBuffersMetricsTest extends TestLogger {
 			.setNetworkBuffersPerChannel(buffersPerChannel)
 			.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
 			.build();
+		closeableRegistry.registerCloseable(network::close);
 
 		Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple1 = buildInputGate(
 			network,
@@ -108,6 +122,8 @@ public class InputBuffersMetricsTest extends TestLogger {
 
 		SingleInputGate inputGate1 = tuple1.f0;
 		SingleInputGate inputGate2 = tuple2.f0;
+		closeableRegistry.registerCloseable(inputGate1::close);
+		closeableRegistry.registerCloseable(inputGate2::close);
 
 		List<RemoteInputChannel> remoteInputChannels = tuple1.f1;
 
@@ -119,29 +135,22 @@ public class InputBuffersMetricsTest extends TestLogger {
 			exclusiveBuffersUsageGauge,
 			inputGates);
 
-		try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
-			assertEquals(0.0, exclusiveBuffersUsageGauge.getValue(), 0.0);
-			assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
-
-			int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
-
-			int channelIndex = 1;
-			for (RemoteInputChannel channel : remoteInputChannels) {
-				drainAndValidate(
-					buffersPerChannel,
-					buffersPerChannel * channelIndex++,
-					channel,
-					closeableRegistry,
-					totalBuffers,
-					buffersPerChannel * totalNumberOfRemoteChannels,
-					exclusiveBuffersUsageGauge,
-					inputBuffersUsageGauge,
-					inputGate1);
-			}
-		} finally {
-			inputGate1.close();
-			inputGate2.close();
-			network.close();
+		assertEquals(0.0, exclusiveBuffersUsageGauge.getValue(), 0.0);
+		assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
+
+		int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
+
+		int channelIndex = 1;
+		for (RemoteInputChannel channel : remoteInputChannels) {
+			drainAndValidate(
+				buffersPerChannel,
+				buffersPerChannel * channelIndex++,
+				channel,
+				totalBuffers,
+				buffersPerChannel * totalNumberOfRemoteChannels,
+				exclusiveBuffersUsageGauge,
+				inputBuffersUsageGauge,
+				inputGate1);
 		}
 	}
 
@@ -162,6 +171,7 @@ public class InputBuffersMetricsTest extends TestLogger {
 			.setNetworkBuffersPerChannel(buffersPerChannel)
 			.setFloatingNetworkBuffersPerGate(extraNetworkBuffersPerGate)
 			.build();
+		closeableRegistry.registerCloseable(network::close);
 
 		Tuple2<SingleInputGate, List<RemoteInputChannel>> tuple1 = buildInputGate(
 			network,
@@ -173,6 +183,8 @@ public class InputBuffersMetricsTest extends TestLogger {
 			numberOfLocalChannelsGate2).f0;
 
 		SingleInputGate inputGate1 = tuple1.f0;
+		closeableRegistry.registerCloseable(inputGate1::close);
+		closeableRegistry.registerCloseable(inputGate2::close);
 
 		RemoteInputChannel remoteInputChannel1 = tuple1.f1.get(0);
 
@@ -184,54 +196,47 @@ public class InputBuffersMetricsTest extends TestLogger {
 			exclusiveBuffersUsageGauge,
 			inputGates);
 
-		try (CloseableRegistry closeableRegistry = new CloseableRegistry()) {
-			assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 0.0);
-			assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
+		assertEquals(0.0, floatingBuffersUsageGauge.getValue(), 0.0);
+		assertEquals(0.0, inputBuffersUsageGauge.getValue(), 0.0);
 
-			// drain gate1's exclusive buffers
-			drainBuffer(buffersPerChannel, remoteInputChannel1, closeableRegistry);
+		// drain gate1's exclusive buffers
+		drainBuffer(buffersPerChannel, remoteInputChannel1);
 
-			int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
+		int totalBuffers = extraNetworkBuffersPerGate * inputGates.length + buffersPerChannel * totalNumberOfRemoteChannels;
 
-			remoteInputChannel1.requestSubpartition(0);
+		remoteInputChannel1.requestSubpartition(0);
 
-			int backlog = 3;
-			int totalRequestedBuffers = buffersPerChannel + backlog;
+		int backlog = 3;
+		int totalRequestedBuffers = buffersPerChannel + backlog;
 
-			remoteInputChannel1.onSenderBacklog(backlog);
+		remoteInputChannel1.onSenderBacklog(backlog);
 
-			assertEquals(totalRequestedBuffers, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
+		assertEquals(totalRequestedBuffers, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
 
-			drainBuffer(totalRequestedBuffers, remoteInputChannel1, closeableRegistry);
+		drainBuffer(totalRequestedBuffers, remoteInputChannel1);
 
-			assertEquals(0, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
-			assertEquals((double) (buffersPerChannel + totalRequestedBuffers) / totalBuffers,
-				inputBuffersUsageGauge.getValue(), 0.0001);
-		} finally {
-			inputGate1.close();
-			inputGate2.close();
-			network.close();
-		}
+		assertEquals(0, remoteInputChannel1.unsynchronizedGetFloatingBuffersAvailable());
+		assertEquals((double) (buffersPerChannel + totalRequestedBuffers) / totalBuffers,
+			inputBuffersUsageGauge.getValue(), 0.0001);
 	}
 
 	private void drainAndValidate(
 		int numBuffersToRequest,
 		int totalRequestedBuffers,
 		RemoteInputChannel channel,
-		CloseableRegistry closeableRegistry,
 		int totalBuffers,
 		int totalExclusiveBuffers,
 		ExclusiveBuffersUsageGauge exclusiveBuffersUsageGauge,
 		CreditBasedInputBuffersUsageGauge inputBuffersUsageGauge,
 		SingleInputGate inputGate) throws IOException {
 
-		drainBuffer(numBuffersToRequest, channel, closeableRegistry);
+		drainBuffer(numBuffersToRequest, channel);
 		assertEquals(totalRequestedBuffers, exclusiveBuffersUsageGauge.calculateUsedBuffers(inputGate));
 		assertEquals((double) totalRequestedBuffers / totalExclusiveBuffers, exclusiveBuffersUsageGauge.getValue(), 0.0001);
 		assertEquals((double) totalRequestedBuffers / totalBuffers, inputBuffersUsageGauge.getValue(), 0.0001);
 	}
 
-	private void drainBuffer(int boundary, RemoteInputChannel channel, CloseableRegistry closeableRegistry) throws IOException {
+	private void drainBuffer(int boundary, RemoteInputChannel channel) throws IOException {
 		for (int i = 0; i < boundary; i++) {
 			Buffer buffer = channel.requestBuffer();
 			if (buffer != null) {


[flink] 01/05: [hotfix][test] Guarantee order of CloseableRegistry

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a68dc3c440c79c031d77d0082b7f8e4707311382
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Jul 5 16:50:57 2019 +0200

    [hotfix][test] Guarantee order of CloseableRegistry
---
 .../java/org/apache/flink/core/fs/CloseableRegistry.java | 16 ++++++++++++++--
 .../org/apache/flink/util/AbstractCloseableRegistry.java |  8 ++++++--
 2 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
index 5f1c9fb..07e8b5b 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
@@ -24,7 +24,10 @@ import org.apache.flink.util.AbstractCloseableRegistry;
 import javax.annotation.Nonnull;
 
 import java.io.Closeable;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 /**
@@ -33,6 +36,8 @@ import java.util.Map;
  * <p>Registering to an already closed registry will throw an exception and close the provided {@link Closeable}
  *
  * <p>All methods in this class are thread-safe.
+ *
+ * <p>This class closes all registered {@link Closeable}s in the reverse registration order.
  */
 @Internal
 public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Object> {
@@ -40,7 +45,7 @@ public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Obje
 	private static final Object DUMMY = new Object();
 
 	public CloseableRegistry() {
-		super(new HashMap<>());
+		super(new LinkedHashMap<>());
 	}
 
 	@Override
@@ -52,4 +57,11 @@ public class CloseableRegistry extends AbstractCloseableRegistry<Closeable, Obje
 	protected boolean doUnRegister(@Nonnull Closeable closeable, @Nonnull Map<Closeable, Object> closeableMap) {
 		return closeableMap.remove(closeable) != null;
 	}
+
+	@Override
+	protected Collection<Closeable> getReferencesToClose() {
+		ArrayList<Closeable> closeablesToClose = new ArrayList<>(closeableToRef.keySet());
+		Collections.reverse(closeablesToClose);
+		return closeablesToClose;
+	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index e6589f6..ff4febd 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -49,7 +49,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
 
 	/** Map from tracked Closeables to some associated meta data. */
 	@GuardedBy("lock")
-	private final Map<Closeable, T> closeableToRef;
+	protected final Map<Closeable, T> closeableToRef;
 
 	/** Indicates if this registry is closed. */
 	@GuardedBy("lock")
@@ -114,7 +114,7 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
 
 			closed = true;
 
-			toCloseCopy = new ArrayList<>(closeableToRef.keySet());
+			toCloseCopy = getReferencesToClose();
 
 			closeableToRef.clear();
 		}
@@ -128,6 +128,10 @@ public abstract class AbstractCloseableRegistry<C extends Closeable, T> implemen
 		}
 	}
 
+	protected Collection<Closeable> getReferencesToClose() {
+		return new ArrayList<>(closeableToRef.keySet());
+	}
+
 	/**
 	 * Does the actual registration of the closeable with the registry map. This should not do any long running or
 	 * potentially blocking operations as is is executed under the registry's lock.


[flink] 03/05: [hotfix][test] Deduplicate NettyShuffleEnvironmentTest code

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 282646ec7e84b4dfe17d95a18585510577366a82
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Jul 9 14:33:27 2019 +0200

    [hotfix][test] Deduplicate NettyShuffleEnvironmentTest code
---
 .../io/network/NettyShuffleEnvironmentBuilder.java |  4 +-
 .../io/network/NettyShuffleEnvironmentTest.java    | 67 +++-------------------
 2 files changed, 10 insertions(+), 61 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index 32b3744..517be70 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -30,9 +30,11 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
  */
 public class NettyShuffleEnvironmentBuilder {
 
+	public static final int DEFAULT_NUM_NETWORK_BUFFERS = 1024;
+
 	private static final String[] DEFAULT_TEMP_DIRS = new String[] {EnvironmentInformation.getTemporaryFileDirectory()};
 
-	private int numNetworkBuffers = 1024;
+	private int numNetworkBuffers = DEFAULT_NUM_NETWORK_BUFFERS;
 
 	private int networkBufferSize = 32 * 1024;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index 11951f2..ab847ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -89,62 +89,7 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 	 */
 	@Test
 	public void testRegisterTaskUsesBoundedBuffers() throws Exception {
-		final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
-			.setIsCreditBased(enableCreditBasedFlowControl)
-			.build();
-
-		// result partitions
-		ResultPartition rp1 = createPartition(network, ResultPartitionType.PIPELINED, 2);
-		ResultPartition rp2 = createPartition(network, fileChannelManager, ResultPartitionType.BLOCKING, 2);
-		ResultPartition rp3 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
-		ResultPartition rp4 = createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 8);
-		final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
-
-		// input gates
-		SingleInputGate ig1 = createSingleInputGate(network, ResultPartitionType.PIPELINED, 2);
-		SingleInputGate ig2 = createSingleInputGate(network, ResultPartitionType.BLOCKING, 2);
-		SingleInputGate ig3 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 2);
-		SingleInputGate ig4 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 8);
-		final SingleInputGate[] inputGates = new SingleInputGate[] {ig1, ig2, ig3, ig4};
-
-		Task.setupPartitionsAndGates(resultPartitions, inputGates);
-
-		// verify buffer pools for the result partitions
-		assertEquals(rp1.getNumberOfSubpartitions(), rp1.getBufferPool().getNumberOfRequiredMemorySegments());
-		assertEquals(rp2.getNumberOfSubpartitions(), rp2.getBufferPool().getNumberOfRequiredMemorySegments());
-		assertEquals(rp3.getNumberOfSubpartitions(), rp3.getBufferPool().getNumberOfRequiredMemorySegments());
-		assertEquals(rp4.getNumberOfSubpartitions(), rp4.getBufferPool().getNumberOfRequiredMemorySegments());
-
-		assertEquals(Integer.MAX_VALUE, rp1.getBufferPool().getMaxNumberOfMemorySegments());
-		assertEquals(Integer.MAX_VALUE, rp2.getBufferPool().getMaxNumberOfMemorySegments());
-		assertEquals(2 * 2 + 8, rp3.getBufferPool().getMaxNumberOfMemorySegments());
-		assertEquals(8 * 2 + 8, rp4.getBufferPool().getMaxNumberOfMemorySegments());
-
-		// verify buffer pools for the input gates (NOTE: credit-based uses minimum required buffers
-		// for exclusive buffers not managed by the buffer pool)
-		assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig1.getBufferPool().getNumberOfRequiredMemorySegments());
-		assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig2.getBufferPool().getNumberOfRequiredMemorySegments());
-		assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig3.getBufferPool().getNumberOfRequiredMemorySegments());
-		assertEquals(enableCreditBasedFlowControl ? 0 : 8, ig4.getBufferPool().getNumberOfRequiredMemorySegments());
-
-		assertEquals(Integer.MAX_VALUE, ig1.getBufferPool().getMaxNumberOfMemorySegments());
-		assertEquals(Integer.MAX_VALUE, ig2.getBufferPool().getMaxNumberOfMemorySegments());
-		assertEquals(enableCreditBasedFlowControl ? 8 : 2 * 2 + 8, ig3.getBufferPool().getMaxNumberOfMemorySegments());
-		assertEquals(enableCreditBasedFlowControl ? 8 : 8 * 2 + 8, ig4.getBufferPool().getMaxNumberOfMemorySegments());
-
-		int invokations = enableCreditBasedFlowControl ? 1 : 0;
-		verify(ig1, times(invokations)).assignExclusiveSegments();
-		verify(ig2, times(invokations)).assignExclusiveSegments();
-		verify(ig3, times(invokations)).assignExclusiveSegments();
-		verify(ig4, times(invokations)).assignExclusiveSegments();
-
-		for (ResultPartition rp : resultPartitions) {
-			rp.release();
-		}
-		for (SingleInputGate ig : inputGates) {
-			ig.close();
-		}
-		network.close();
+		testRegisterTaskWithLimitedBuffers(NettyShuffleEnvironmentBuilder.DEFAULT_NUM_NETWORK_BUFFERS, false);
 	}
 
 	/**
@@ -164,7 +109,7 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 			bufferCount = 10 + 10 * NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue();
 		}
 
-		testRegisterTaskWithLimitedBuffers(bufferCount);
+		testRegisterTaskWithLimitedBuffers(bufferCount, true);
 	}
 
 	/**
@@ -185,10 +130,10 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 
 		expectedException.expect(IOException.class);
 		expectedException.expectMessage("Insufficient number of network buffers");
-		testRegisterTaskWithLimitedBuffers(bufferCount);
+		testRegisterTaskWithLimitedBuffers(bufferCount, true);
 	}
 
-	private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception {
+	private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize, boolean assertNumBuffers) throws Exception {
 		final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
 			.setNumNetworkBuffers(bufferPoolSize)
 			.setIsCreditBased(enableCreditBasedFlowControl)
@@ -238,7 +183,9 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 
 		for (ResultPartition rp : resultPartitions) {
 			assertEquals(rp.getNumberOfSubpartitions(), rp.getBufferPool().getNumberOfRequiredMemorySegments());
-			assertEquals(rp.getNumberOfSubpartitions(), rp.getBufferPool().getNumBuffers());
+			if (assertNumBuffers) {
+				assertEquals(rp.getNumberOfSubpartitions(), rp.getBufferPool().getNumBuffers());
+			}
 		}
 
 		// verify buffer pools for the input gates (NOTE: credit-based uses minimum required buffers


[flink] 04/05: [FLINK-13013][network] Request partitions during InputGate#setup

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c27d0d68dc76ef4818f12296b28d78d6a78d4965
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Mon Jul 1 11:57:33 2019 +0200

    [FLINK-13013][network] Request partitions during InputGate#setup
    
    Before partitions were being requested on first polling/getting next buffer
    which was causing a couple of issues:
    - it was a little bit confusing
    - after first requestPartitions call, this was causing unnecessary synchronisation overhead
    - this was preventing data notifications to come through and isAvailable() future was always not
      completed before the first attempt to read the data from the input gate
    
    This commit moves requesting partitions to InputGate#setup solving those issues.
---
 .../io/network/partition/consumer/InputGate.java   |  2 +-
 .../consumer/RemoteChannelStateChecker.java        |  9 +-
 .../partition/consumer/SingleInputGate.java        |  5 +-
 .../network/partition/consumer/UnionInputGate.java |  3 -
 .../runtime/taskmanager/InputGateWithMetrics.java  |  2 +-
 .../org/apache/flink/runtime/taskmanager/Task.java |  4 +-
 .../io/network/NettyShuffleEnvironmentTest.java    | 30 +++----
 .../runtime/io/network/buffer/NoOpBufferPool.java  | 95 ++++++++++++++++++++++
 .../network/partition/InputChannelTestUtils.java   | 21 +++++
 .../network/partition/InputGateFairnessTest.java   | 29 ++++++-
 .../io/network/partition/PartitionTestUtils.java   |  2 +
 .../consumer/InputBuffersMetricsTest.java          | 36 ++++++--
 .../partition/consumer/LocalInputChannelTest.java  |  8 +-
 .../partition/consumer/SingleInputGateBuilder.java |  5 ++
 .../partition/consumer/SingleInputGateTest.java    | 23 +++++-
 .../apache/flink/runtime/taskmanager/TaskTest.java | 26 ++++--
 .../StreamNetworkBenchmarkEnvironment.java         |  3 +-
 .../StreamNetworkPointToPointBenchmark.java        |  2 +-
 .../StreamNetworkThroughputBenchmark.java          |  2 +-
 19 files changed, 252 insertions(+), 55 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 0ce446b..e9f2399 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -130,5 +130,5 @@ public abstract class InputGate implements AsyncDataInput<BufferOrEvent>, AutoCl
 	/**
 	 * Setup gate, potentially heavy-weight, blocking operation comparing to just creation.
 	 */
-	public abstract void setup() throws IOException;
+	public abstract void setup() throws IOException, InterruptedException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java
index 69ee3fd..4bcfb4c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteChannelStateChecker.java
@@ -46,7 +46,8 @@ public class RemoteChannelStateChecker {
 
 	public boolean isProducerReadyOrAbortConsumption(ResponseHandle responseHandle) {
 		Either<ExecutionState, Throwable> result = responseHandle.getProducerExecutionState();
-		if (responseHandle.getConsumerExecutionState() != ExecutionState.RUNNING) {
+		ExecutionState consumerExecutionState = responseHandle.getConsumerExecutionState();
+		if (!isConsumerStateValidForConsumption(consumerExecutionState)) {
 			LOG.debug(
 				"Ignore a partition producer state notification for task {}, because it's not running.",
 				taskNameWithSubtask);
@@ -64,6 +65,12 @@ public class RemoteChannelStateChecker {
 		return false;
 	}
 
+	private static boolean isConsumerStateValidForConsumption(
+			ExecutionState consumerExecutionState) {
+		return consumerExecutionState == ExecutionState.RUNNING ||
+			consumerExecutionState == ExecutionState.DEPLOYING;
+	}
+
 	private boolean isProducerConsumerReady(ResponseHandle responseHandle) {
 		ExecutionState producerState = getProducerState(responseHandle);
 		return producerState == ExecutionState.SCHEDULED ||
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index b23572d..696dbe8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -202,7 +202,7 @@ public class SingleInputGate extends InputGate {
 	}
 
 	@Override
-	public void setup() throws IOException {
+	public void setup() throws IOException, InterruptedException {
 		checkState(this.bufferPool == null, "Bug in input gate setup logic: Already registered buffer pool.");
 		if (isCreditBased) {
 			// assign exclusive buffers to input channels directly and use the rest for floating buffers
@@ -211,6 +211,8 @@ public class SingleInputGate extends InputGate {
 
 		BufferPool bufferPool = bufferPoolFactory.get();
 		setBufferPool(bufferPool);
+
+		requestPartitions();
 	}
 
 	// ------------------------------------------------------------------------
@@ -481,7 +483,6 @@ public class SingleInputGate extends InputGate {
 			throw new IllegalStateException("Released");
 		}
 
-		requestPartitions();
 		Optional<InputWithData<InputChannel, BufferAndAvailability>> next = waitAndGetNextData(blocking);
 		if (!next.isPresent()) {
 			return Optional.empty();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 65a15ff..2b5b5c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -167,9 +167,6 @@ public class UnionInputGate extends InputGate {
 			return Optional.empty();
 		}
 
-		// Make sure to request the partitions, if they have not been requested before.
-		requestPartitions();
-
 		Optional<InputWithData<InputGate, BufferOrEvent>> next = waitAndGetNextData(blocking);
 		if (!next.isPresent()) {
 			return Optional.empty();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
index 669c02e..27d01d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
@@ -61,7 +61,7 @@ public class InputGateWithMetrics extends InputGate {
 	}
 
 	@Override
-	public void setup() throws IOException {
+	public void setup() throws IOException, InterruptedException {
 		inputGate.setup();
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 62886a5..4355821 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -834,12 +834,14 @@ public class Task implements Runnable, TaskActions, PartitionProducerStateProvid
 
 	@VisibleForTesting
 	public static void setupPartitionsAndGates(
-		ResultPartitionWriter[] producedPartitions, InputGate[] inputGates) throws IOException {
+		ResultPartitionWriter[] producedPartitions, InputGate[] inputGates) throws IOException, InterruptedException {
 
 		for (ResultPartitionWriter partition : producedPartitions) {
 			partition.setup();
 		}
 
+		// InputGates must be initialized after the partitions, since during InputGate#setup
+		// we are requesting partitions
 		for (InputGate gate : inputGates) {
 			gate.setup();
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index ab847ad..ba70a47 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -155,23 +155,19 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
 		SingleInputGate ig4 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 4);
 		final SingleInputGate[] inputGates = new SingleInputGate[] {ig1, ig2, ig3, ig4};
 
-		// set up remote input channels for the exclusive buffers of the credit-based flow control
-		// (note that this does not obey the partition types which is ok for the scope of the test)
-		if (enableCreditBasedFlowControl) {
-			createRemoteInputChannel(ig4, 0, rp1, connManager, network.getNetworkBufferPool());
-			createRemoteInputChannel(ig4, 0, rp2, connManager, network.getNetworkBufferPool());
-			createRemoteInputChannel(ig4, 0, rp3, connManager, network.getNetworkBufferPool());
-			createRemoteInputChannel(ig4, 0, rp4, connManager, network.getNetworkBufferPool());
-
-			createRemoteInputChannel(ig1, 1, rp1, connManager, network.getNetworkBufferPool());
-			createRemoteInputChannel(ig1, 1, rp4, connManager, network.getNetworkBufferPool());
-
-			createRemoteInputChannel(ig2, 1, rp2, connManager, network.getNetworkBufferPool());
-			createRemoteInputChannel(ig2, 2, rp4, connManager, network.getNetworkBufferPool());
-
-			createRemoteInputChannel(ig3, 1, rp3, connManager, network.getNetworkBufferPool());
-			createRemoteInputChannel(ig3, 3, rp4, connManager, network.getNetworkBufferPool());
-		}
+		createRemoteInputChannel(ig4, 0, rp1, connManager, network.getNetworkBufferPool());
+		createRemoteInputChannel(ig4, 0, rp2, connManager, network.getNetworkBufferPool());
+		createRemoteInputChannel(ig4, 0, rp3, connManager, network.getNetworkBufferPool());
+		createRemoteInputChannel(ig4, 0, rp4, connManager, network.getNetworkBufferPool());
+
+		createRemoteInputChannel(ig1, 1, rp1, connManager, network.getNetworkBufferPool());
+		createRemoteInputChannel(ig1, 1, rp4, connManager, network.getNetworkBufferPool());
+
+		createRemoteInputChannel(ig2, 1, rp2, connManager, network.getNetworkBufferPool());
+		createRemoteInputChannel(ig2, 2, rp4, connManager, network.getNetworkBufferPool());
+
+		createRemoteInputChannel(ig3, 1, rp3, connManager, network.getNetworkBufferPool());
+		createRemoteInputChannel(ig3, 3, rp4, connManager, network.getNetworkBufferPool());
 
 		Task.setupPartitionsAndGates(resultPartitions, inputGates);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
new file mode 100644
index 0000000..04c9c04
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NoOpBufferPool.java
@@ -0,0 +1,95 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+// We have it in this package because we could not mock the methods otherwise
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import java.io.IOException;
+
+/**
+ * No-op implementation of {@link BufferPool}.
+ */
+public class NoOpBufferPool implements BufferPool {
+
+	@Override
+	public void lazyDestroy() {
+	}
+
+	@Override
+	public Buffer requestBuffer() throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Buffer requestBufferBlocking() throws IOException, InterruptedException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean addBufferListener(BufferListener listener) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean isDestroyed() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public int getNumberOfRequiredMemorySegments() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public int getMaxNumberOfMemorySegments() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public int getNumBuffers() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setNumBuffers(int numBuffers) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public int getNumberOfAvailableMemorySegments() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public int bestEffortGetNumOfUsedBuffers() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void recycle(MemorySegment memorySegment) {
+		throw new UnsupportedOperationException();
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index 16d6cab..7805ef8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemorySegmentProvider;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -195,4 +196,24 @@ public class InputChannelTestUtils {
 		public void recycleMemorySegments(Collection<MemorySegment> segments) {
 		}
 	}
+
+	/**
+	 * {@link MemorySegmentProvider} that provides unpooled {@link MemorySegment}s.
+	 */
+	public static class UnpooledMemorySegmentProvider implements MemorySegmentProvider {
+		private final int pageSize;
+
+		public UnpooledMemorySegmentProvider(int pageSize) {
+			this.pageSize = pageSize;
+		}
+
+		@Override
+		public Collection<MemorySegment> requestMemorySegments() {
+			return Collections.singletonList(MemorySegmentFactory.allocateUnpooledSegment(pageSize));
+		}
+
+		@Override
+		public void recycleMemorySegments(Collection<MemorySegment> segments) {
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index 2bf5a09..da05f83 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -24,8 +24,11 @@ import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NoOpBufferPool;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.UnpooledMemorySegmentProvider;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
@@ -47,7 +50,6 @@ import java.util.Optional;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createLocalInputChannel;
-import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel;
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createResultPartitionManager;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -93,6 +95,8 @@ public class InputGateFairnessTest {
 			createLocalInputChannel(gate, i, resultPartitionManager);
 		}
 
+		gate.setup();
+
 		// read all the buffers and the EOF event
 		for (int i = numberOfChannels * (buffersPerChannel + 1); i > 0; --i) {
 			assertNotNull(gate.getNext());
@@ -141,6 +145,8 @@ public class InputGateFairnessTest {
 			// seed one initial buffer
 			sources[12].add(bufferConsumer.copy());
 
+			gate.setup();
+
 			// read all the buffers and the EOF event
 			for (int i = 0; i < numberOfChannels * buffersPerChannel; i++) {
 				assertNotNull(gate.getNext());
@@ -190,6 +196,8 @@ public class InputGateFairnessTest {
 			channel.onBuffer(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), buffersPerChannel, -1);
 		}
 
+		gate.setup();
+
 		// read all the buffers and the EOF event
 		for (int i = numberOfChannels * (buffersPerChannel + 1); i > 0; --i) {
 			assertNotNull(gate.getNext());
@@ -233,6 +241,8 @@ public class InputGateFairnessTest {
 		channels[11].onBuffer(mockBuffer, 0, -1);
 		channelSequenceNums[11]++;
 
+		gate.setup();
+
 		// read all the buffers and the EOF event
 		for (int i = 0; i < numberOfChannels * buffersPerChannel; i++) {
 			assertNotNull(gate.getNext());
@@ -308,9 +318,8 @@ public class InputGateFairnessTest {
 	// ------------------------------------------------------------------------
 
 	private static class FairnessVerifyingInputGate extends SingleInputGate {
-		private static final SupplierWithException<BufferPool, IOException> STUB_BUFFER_POOL_FACTORY = () -> {
-			throw new UnsupportedOperationException();
-		};
+		private static final SupplierWithException<BufferPool, IOException> STUB_BUFFER_POOL_FACTORY =
+			NoOpBufferPool::new;
 
 		private final ArrayDeque<InputChannel> channelsWithData;
 
@@ -368,4 +377,16 @@ public class InputGateFairnessTest {
 			uniquenessChecker.clear();
 		}
 	}
+
+	public static RemoteInputChannel createRemoteInputChannel(
+		SingleInputGate inputGate,
+		int channelIndex,
+		ConnectionManager connectionManager) {
+
+		return InputChannelBuilder.newBuilder()
+			.setChannelIndex(channelIndex)
+			.setConnectionManager(connectionManager)
+			.setMemorySegmentProvider(new UnpooledMemorySegmentProvider(32 * 1024))
+			.buildRemoteAndSetToGate(inputGate);
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index e559659..5e39a43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -63,6 +63,7 @@ public class PartitionTestUtils {
 			ResultPartitionType partitionType,
 			int numChannels) {
 		return new ResultPartitionBuilder()
+			.setResultPartitionManager(environment.getResultPartitionManager())
 			.setupBufferPoolFactoryFromNettyShuffleEnvironment(environment)
 			.setResultPartitionType(partitionType)
 			.setNumberOfSubpartitions(numChannels)
@@ -75,6 +76,7 @@ public class PartitionTestUtils {
 			ResultPartitionType partitionType,
 			int numChannels) {
 		return new ResultPartitionBuilder()
+			.setResultPartitionManager(environment.getResultPartitionManager())
 			.setupBufferPoolFactoryFromNettyShuffleEnvironment(environment)
 			.setFileChannelManager(channelManager)
 			.setResultPartitionType(partitionType)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
index 8d868cc..9b17a2e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputBuffersMetricsTest.java
@@ -27,6 +27,8 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.metrics.CreditBasedInputBuffersUsageGauge;
 import org.apache.flink.runtime.io.network.metrics.ExclusiveBuffersUsageGauge;
 import org.apache.flink.runtime.io.network.metrics.FloatingBuffersUsageGauge;
+import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.util.TestLogger;
 
@@ -58,7 +60,7 @@ public class InputBuffersMetricsTest extends TestLogger {
 	}
 
 	@Test
-	public void testCalculateTotalBuffersSize() throws IOException {
+	public void testCalculateTotalBuffersSize() throws Exception {
 		int numberOfRemoteChannels = 2;
 		int numberOfLocalChannels = 0;
 
@@ -76,6 +78,7 @@ public class InputBuffersMetricsTest extends TestLogger {
 			numberOfRemoteChannels,
 			numberOfLocalChannels).f0;
 		closeableRegistry.registerCloseable(inputGate1::close);
+		inputGate1.setup();
 
 		SingleInputGate[] inputGates = new SingleInputGate[]{inputGate1};
 		FloatingBuffersUsageGauge floatingBuffersUsageGauge = new FloatingBuffersUsageGauge(inputGates);
@@ -94,7 +97,7 @@ public class InputBuffersMetricsTest extends TestLogger {
 	}
 
 	@Test
-	public void testExclusiveBuffersUsage() throws IOException {
+	public void testExclusiveBuffersUsage() throws Exception {
 		int numberOfRemoteChannelsGate1 = 2;
 		int numberOfLocalChannelsGate1 = 0;
 		int numberOfRemoteChannelsGate2 = 1;
@@ -124,6 +127,8 @@ public class InputBuffersMetricsTest extends TestLogger {
 		SingleInputGate inputGate2 = tuple2.f0;
 		closeableRegistry.registerCloseable(inputGate1::close);
 		closeableRegistry.registerCloseable(inputGate2::close);
+		inputGate1.setup();
+		inputGate2.setup();
 
 		List<RemoteInputChannel> remoteInputChannels = tuple1.f1;
 
@@ -155,7 +160,7 @@ public class InputBuffersMetricsTest extends TestLogger {
 	}
 
 	@Test
-	public void testFloatingBuffersUsage() throws IOException, InterruptedException {
+	public void testFloatingBuffersUsage() throws Exception {
 
 		int numberOfRemoteChannelsGate1 = 2;
 		int numberOfLocalChannelsGate1 = 0;
@@ -185,6 +190,8 @@ public class InputBuffersMetricsTest extends TestLogger {
 		SingleInputGate inputGate1 = tuple1.f0;
 		closeableRegistry.registerCloseable(inputGate1::close);
 		closeableRegistry.registerCloseable(inputGate2::close);
+		inputGate1.setup();
+		inputGate2.setup();
 
 		RemoteInputChannel remoteInputChannel1 = tuple1.f1.get(0);
 
@@ -250,7 +257,7 @@ public class InputBuffersMetricsTest extends TestLogger {
 	private Tuple2<SingleInputGate, List<RemoteInputChannel>> buildInputGate(
 		NettyShuffleEnvironment network,
 		int numberOfRemoteChannels,
-		int numberOfLocalChannels) throws IOException {
+		int numberOfLocalChannels) throws Exception {
 
 		SingleInputGate inputGate = new SingleInputGateBuilder()
 			.setNumberOfChannels(numberOfRemoteChannels + numberOfLocalChannels)
@@ -262,22 +269,31 @@ public class InputBuffersMetricsTest extends TestLogger {
 
 		int channelIdx = 0;
 		for (int i = 0; i < numberOfRemoteChannels; i++) {
-			res.f1.add(buildRemoteChannel(channelIdx, inputGate, network));
+			ResultPartition partition = PartitionTestUtils.createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 1);
+			closeableRegistry.registerCloseable(partition::close);
+			partition.setup();
+
+			res.f1.add(buildRemoteChannel(channelIdx, inputGate, network, partition));
 			channelIdx++;
 		}
 
 		for (int i = 0; i < numberOfLocalChannels; i++) {
-			buildLocalChannel(channelIdx, inputGate, network);
+			ResultPartition partition = PartitionTestUtils.createPartition(network, ResultPartitionType.PIPELINED_BOUNDED, 1);
+			closeableRegistry.registerCloseable(partition::close);
+			partition.setup();
+
+			buildLocalChannel(channelIdx, inputGate, network, partition);
 		}
-		inputGate.setup();
 		return res;
 	}
 
 	private RemoteInputChannel buildRemoteChannel(
 		int channelIndex,
 		SingleInputGate inputGate,
-		NettyShuffleEnvironment network) {
+		NettyShuffleEnvironment network,
+		ResultPartition partition) {
 		return new InputChannelBuilder()
+			.setPartitionId(partition.getPartitionId())
 			.setChannelIndex(channelIndex)
 			.setupFromNettyShuffleEnvironment(network)
 			.setConnectionManager(new TestingConnectionManager())
@@ -287,8 +303,10 @@ public class InputBuffersMetricsTest extends TestLogger {
 	private void buildLocalChannel(
 		int channelIndex,
 		SingleInputGate inputGate,
-		NettyShuffleEnvironment network) {
+		NettyShuffleEnvironment network,
+		ResultPartition partition) {
 		new InputChannelBuilder()
+			.setPartitionId(partition.getPartitionId())
 			.setChannelIndex(channelIndex)
 			.setupFromNettyShuffleEnvironment(network)
 			.setConnectionManager(new TestingConnectionManager())
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 72404ce..fd7cdd1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -493,7 +493,7 @@ public class LocalInputChannelTest {
 				BufferPool bufferPool,
 				ResultPartitionManager partitionManager,
 				TaskEventDispatcher taskEventDispatcher,
-				ResultPartitionID[] consumedPartitionIds) {
+				ResultPartitionID[] consumedPartitionIds) throws IOException, InterruptedException {
 
 			checkArgument(numberOfInputChannels >= 1);
 			checkArgument(numberOfExpectedBuffersPerChannel >= 1);
@@ -501,11 +501,9 @@ public class LocalInputChannelTest {
 			this.inputGate = new SingleInputGateBuilder()
 				.setConsumedSubpartitionIndex(subpartitionIndex)
 				.setNumberOfChannels(numberOfInputChannels)
+				.setBufferPoolFactory(bufferPool)
 				.build();
 
-			// Set buffer pool
-			inputGate.setBufferPool(bufferPool);
-
 			// Setup input channels
 			for (int i = 0; i < numberOfInputChannels; i++) {
 				InputChannelBuilder.newBuilder()
@@ -516,6 +514,8 @@ public class LocalInputChannelTest {
 					.buildLocalAndSetToGate(inputGate);
 			}
 
+			inputGate.setup();
+
 			this.numberOfInputChannels = numberOfInputChannels;
 			this.numberOfExpectedBuffersPerChannel = numberOfExpectedBuffersPerChannel;
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index b23af06..956bad9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -87,6 +87,11 @@ public class SingleInputGateBuilder {
 		return this;
 	}
 
+	public SingleInputGateBuilder setBufferPoolFactory(BufferPool bufferPool) {
+		this.bufferPoolFactory = () -> bufferPool;
+		return this;
+	}
+
 	public SingleInputGate build() {
 		return new SingleInputGate(
 			"Single Input Gate",
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index d737949..cad957f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
@@ -34,6 +35,8 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -419,6 +422,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 			RemoteInputChannel remote =
 				InputChannelBuilder.newBuilder()
 					.setupFromNettyShuffleEnvironment(network)
+					.setConnectionManager(new TestingConnectionManager())
 					.buildRemoteAndSetToGate(inputGate);
 			inputGate.setup();
 
@@ -498,13 +502,27 @@ public class SingleInputGateTest extends InputGateTestBase {
 	@Test
 	public void testUpdateUnknownInputChannel() throws Exception {
 		final NettyShuffleEnvironment network = createNettyShuffleEnvironment();
+
+		final ResultPartition localResultPartition = new ResultPartitionBuilder()
+			.setResultPartitionManager(network.getResultPartitionManager())
+			.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+			.build();
+
+		final ResultPartition remoteResultPartition = new ResultPartitionBuilder()
+			.setResultPartitionManager(network.getResultPartitionManager())
+			.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
+			.build();
+
+		localResultPartition.setup();
+		remoteResultPartition.setup();
+
 		final SingleInputGate inputGate = createInputGate(network, 2, ResultPartitionType.PIPELINED);
 
 		try {
-			final ResultPartitionID localResultPartitionId = new ResultPartitionID();
+			final ResultPartitionID localResultPartitionId = localResultPartition.getPartitionId();
 			addUnknownInputChannel(network, inputGate, localResultPartitionId, 0);
 
-			final ResultPartitionID remoteResultPartitionId = new ResultPartitionID();
+			final ResultPartitionID remoteResultPartitionId = remoteResultPartition.getPartitionId();
 			addUnknownInputChannel(network, inputGate, remoteResultPartitionId, 1);
 
 			inputGate.setup();
@@ -628,6 +646,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 			.setChannelIndex(channelIndex)
 			.setPartitionId(partitionId)
 			.setupFromNettyShuffleEnvironment(network)
+			.setConnectionManager(new TestingConnectionManager())
 			.buildUnknownAndSetToGate(inputGate);
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 46af3e5..ee78963 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -565,7 +565,21 @@ public class TaskTest extends TestLogger {
 	}
 
 	@Test
-	public void testOnPartitionStateUpdate() throws Exception {
+	public void testOnPartitionStateUpdateWhileRunning() throws Exception {
+		testOnPartitionStateUpdate(ExecutionState.RUNNING);
+	}
+
+	/**
+	 * Partition state updates can also happen when {@link Task} is in
+	 * {@link ExecutionState#DEPLOYING} state, because we are requesting for partitions during
+	 * setting up input gates.
+	 */
+	@Test
+	public void testOnPartitionStateUpdateWhileDeploying() throws Exception {
+		testOnPartitionStateUpdate(ExecutionState.DEPLOYING);
+	}
+
+	public void testOnPartitionStateUpdate(ExecutionState initialTaskState) throws Exception {
 		final ResultPartitionID partitionId = new ResultPartitionID();
 
 		final Task task = createTaskBuilder()
@@ -583,10 +597,10 @@ public class TaskTest extends TestLogger {
 			expected.put(state, ExecutionState.FAILED);
 		}
 
-		expected.put(ExecutionState.RUNNING, ExecutionState.RUNNING);
-		expected.put(ExecutionState.SCHEDULED, ExecutionState.RUNNING);
-		expected.put(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
-		expected.put(ExecutionState.FINISHED, ExecutionState.RUNNING);
+		expected.put(ExecutionState.RUNNING, initialTaskState);
+		expected.put(ExecutionState.SCHEDULED, initialTaskState);
+		expected.put(ExecutionState.DEPLOYING, initialTaskState);
+		expected.put(ExecutionState.FINISHED, initialTaskState);
 
 		expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING);
 		expected.put(ExecutionState.CANCELING, ExecutionState.CANCELING);
@@ -594,7 +608,7 @@ public class TaskTest extends TestLogger {
 
 		int producingStateCounter = 0;
 		for (ExecutionState state : ExecutionState.values()) {
-			setState(task, ExecutionState.RUNNING);
+			setState(task, initialTaskState);
 
 			if (checker.isProducerReadyOrAbortConsumption(task.new PartitionProducerStateResponseHandle(state, null))) {
 				producingStateCounter++;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 0cdc658..8351b3c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -50,7 +50,6 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Arrays;
@@ -231,7 +230,7 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 		return consumableNotifyingPartitionWriter;
 	}
 
-	private InputGate createInputGate(TaskManagerLocation senderLocation) throws IOException {
+	private InputGate createInputGate(TaskManagerLocation senderLocation) throws Exception {
 		InputGate[] gates = new InputGate[channels];
 		for (int channel = 0; channel < channels; ++channel) {
 			final InputGateDeploymentDescriptor gateDescriptor = createInputGateDeploymentDescriptor(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index 7488688..bb6b9e2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -77,8 +77,8 @@ public class StreamNetworkPointToPointBenchmark {
 		environment = new StreamNetworkBenchmarkEnvironment<>();
 		environment.setUp(1, 1, false, false, -1, -1, config);
 
-		receiver = environment.createReceiver();
 		recordWriter = environment.createRecordWriter(0, flushTimeout);
+		receiver = environment.createReceiver();
 	}
 
 	/**
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
index 0586f54..b5d1c07 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -110,7 +110,6 @@ public class StreamNetworkThroughputBenchmark {
 			senderBufferPoolSize,
 			receiverBufferPoolSize,
 			config);
-		receiver = environment.createReceiver();
 		writerThreads = new LongRecordWriterThread[recordWriters];
 		for (int writer = 0; writer < recordWriters; writer++) {
 			writerThreads[writer] = new LongRecordWriterThread(
@@ -118,6 +117,7 @@ public class StreamNetworkThroughputBenchmark {
 				broadcastMode);
 			writerThreads[writer].start();
 		}
+		receiver = environment.createReceiver();
 	}
 
 	/**