You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/04/22 20:40:49 UTC

[13/17] flink git commit: [FLINK-9076] [network] Reduce minimum number of floating buffers to 0

[FLINK-9076] [network] Reduce minimum number of floating buffers to 0

This closes #5874


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c491400b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c491400b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c491400b

Branch: refs/heads/master
Commit: c491400bcb354d242396302a5561920cb525dbae
Parents: 6581914
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Apr 6 11:15:10 2018 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Apr 22 16:28:35 2018 +0200

----------------------------------------------------------------------
 .../flink/configuration/TaskManagerOptions.java |   2 +-
 .../runtime/io/network/NetworkEnvironment.java  |   5 +-
 .../io/network/NetworkEnvironmentTest.java      | 259 ++++++++++++++++---
 .../partition/InputChannelTestUtils.java        |   2 +-
 .../StreamNetworkBenchmarkEnvironment.java      |  34 ++-
 .../StreamNetworkPointToPointBenchmark.java     |   2 +-
 .../StreamNetworkThroughputBenchmark.java       |  14 +-
 .../StreamNetworkThroughputBenchmarkTest.java   |  45 ++++
 ...SuccessAfterNetworkBuffersFailureITCase.java |   2 +-
 9 files changed, 318 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c491400b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index 2bd3091..b726e07 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -275,7 +275,7 @@ public class TaskManagerOptions {
 	public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
 			key("taskmanager.network.memory.buffers-per-channel")
 			.defaultValue(2)
-			.withDescription("Number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
+			.withDescription("Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel)." +
 				"In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be" +
 				" configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is" +
 				" for parallel serialization.");

http://git-wip-us.apache.org/repos/asf/flink/blob/c491400b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 23ac911..0a9dc0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -234,10 +234,9 @@ public class NetworkEnvironment {
 				maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
 					extraNetworkBuffersPerGate : Integer.MAX_VALUE;
 
-				// Create a buffer pool for floating buffers and assign exclusive buffers to input channels directly
-				bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate,
-					maxNumberOfMemorySegments);
+				// assign exclusive buffers to input channels directly and use the rest for floating buffers
 				gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
+				bufferPool = networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments);
 			} else {
 				maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
 					gate.getNumberOfInputChannels() * networkBuffersPerChannel +

http://git-wip-us.apache.org/repos/asf/flink/blob/c491400b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 24878ef..317a214 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -19,39 +19,60 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 
+import org.junit.Rule;
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.spy;
 
 /**
  * Various tests for the {@link NetworkEnvironment} class.
  */
+@RunWith(Parameterized.class)
 public class NetworkEnvironmentTest {
 	private static final int numBuffers = 1024;
 
 	private static final int memorySegmentSize = 128;
 
+	@Parameterized.Parameter
+	public boolean enableCreditBasedFlowControl;
+
+	@Parameterized.Parameters(name = "Credit-based = {0}")
+	public static List<Boolean> parameters() {
+		return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
+	}
+
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
 	/**
 	 * Verifies that {@link NetworkEnvironment#registerTask(Task)} sets up (un)bounded buffer pool
 	 * instances for various types of input and output channels.
@@ -72,7 +93,7 @@ public class NetworkEnvironmentTest {
 			0,
 			2,
 			8,
-			true);
+			enableCreditBasedFlowControl);
 
 		// result partitions
 		ResultPartition rp1 = createResultPartition(ResultPartitionType.PIPELINED, 2);
@@ -82,10 +103,11 @@ public class NetworkEnvironmentTest {
 		final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
 
 		// input gates
-		SingleInputGate ig1 = createSingleInputGateMock(ResultPartitionType.PIPELINED, 2);
-		SingleInputGate ig2 = createSingleInputGateMock(ResultPartitionType.BLOCKING, 2);
-		SingleInputGate ig3 = createSingleInputGateMock(ResultPartitionType.PIPELINED_BOUNDED, 2);
-		final SingleInputGate[] inputGates = new SingleInputGate[] {ig1, ig2, ig3};
+		SingleInputGate ig1 = createSingleInputGate(ResultPartitionType.PIPELINED, 2);
+		SingleInputGate ig2 = createSingleInputGate(ResultPartitionType.BLOCKING, 2);
+		SingleInputGate ig3 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 2);
+		SingleInputGate ig4 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 8);
+		final SingleInputGate[] inputGates = new SingleInputGate[] {ig1, ig2, ig3, ig4};
 
 		// overall task to register
 		Task task = mock(Task.class);
@@ -94,15 +116,177 @@ public class NetworkEnvironmentTest {
 
 		network.registerTask(task);
 
+		// verify buffer pools for the result partitions
+		assertEquals(rp1.getNumberOfSubpartitions(), rp1.getBufferPool().getNumberOfRequiredMemorySegments());
+		assertEquals(rp2.getNumberOfSubpartitions(), rp2.getBufferPool().getNumberOfRequiredMemorySegments());
+		assertEquals(rp3.getNumberOfSubpartitions(), rp3.getBufferPool().getNumberOfRequiredMemorySegments());
+		assertEquals(rp4.getNumberOfSubpartitions(), rp4.getBufferPool().getNumberOfRequiredMemorySegments());
+
 		assertEquals(Integer.MAX_VALUE, rp1.getBufferPool().getMaxNumberOfMemorySegments());
 		assertEquals(Integer.MAX_VALUE, rp2.getBufferPool().getMaxNumberOfMemorySegments());
 		assertEquals(2 * 2 + 8, rp3.getBufferPool().getMaxNumberOfMemorySegments());
 		assertEquals(8 * 2 + 8, rp4.getBufferPool().getMaxNumberOfMemorySegments());
 
-		verify(ig1, times(1)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
-		verify(ig2, times(1)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
-		verify(ig3, times(1)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+		// verify buffer pools for the input gates (NOTE: credit-based uses minimum required buffers
+		// for exclusive buffers not managed by the buffer pool)
+		assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig1.getBufferPool().getNumberOfRequiredMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig2.getBufferPool().getNumberOfRequiredMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig3.getBufferPool().getNumberOfRequiredMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? 0 : 8, ig4.getBufferPool().getNumberOfRequiredMemorySegments());
+
+		assertEquals(Integer.MAX_VALUE, ig1.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(Integer.MAX_VALUE, ig2.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? 8 : 2 * 2 + 8, ig3.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? 8 : 8 * 2 + 8, ig4.getBufferPool().getMaxNumberOfMemorySegments());
+
+		int invokations = enableCreditBasedFlowControl ? 1 : 0;
+		verify(ig1, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+		verify(ig2, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+		verify(ig3, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+		verify(ig4, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+
+		for (ResultPartition rp : resultPartitions) {
+			rp.release();
+		}
+		for (SingleInputGate ig : inputGates) {
+			ig.releaseAllResources();
+		}
+		network.shutdown();
+	}
+
+	/**
+	 * Verifies that {@link NetworkEnvironment#registerTask(Task)} sets up (un)bounded buffer pool
+	 * instances for various types of input and output channels working with the bare minimum of
+	 * required buffers.
+	 */
+	@Test
+	public void testRegisterTaskWithLimitedBuffers() throws Exception {
+		final int bufferCount;
+		// outgoing: 1 buffer per channel (always)
+		if (!enableCreditBasedFlowControl) {
+			// incoming: 1 buffer per channel
+			bufferCount = 20;
+		} else {
+			// incoming: 2 exclusive buffers per channel
+			bufferCount = 10 + 10 * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue();
+		}
+
+		testRegisterTaskWithLimitedBuffers(bufferCount);
+	}
+
+	/**
+	 * Verifies that {@link NetworkEnvironment#registerTask(Task)} fails if the bare minimum of
+	 * required buffers is not available (we are one buffer short).
+	 */
+	@Test
+	public void testRegisterTaskWithInsufficientBuffers() throws Exception {
+		final int bufferCount;
+		// outgoing: 1 buffer per channel (always)
+		if (!enableCreditBasedFlowControl) {
+			// incoming: 1 buffer per channel
+			bufferCount = 19;
+		} else {
+			// incoming: 2 exclusive buffers per channel
+			bufferCount = 10 + 10 * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1;
+		}
+
+		expectedException.expect(IOException.class);
+		expectedException.expectMessage("Insufficient number of network buffers");
+		testRegisterTaskWithLimitedBuffers(bufferCount);
+	}
+
+	private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception {
+		final NetworkEnvironment network = new NetworkEnvironment(
+			new NetworkBufferPool(bufferPoolSize, memorySegmentSize),
+			new LocalConnectionManager(),
+			new ResultPartitionManager(),
+			new TaskEventDispatcher(),
+			new KvStateRegistry(),
+			null,
+			null,
+			IOManager.IOMode.SYNC,
+			0,
+			0,
+			2,
+			8,
+			enableCreditBasedFlowControl);
+
+		final ConnectionManager connManager = createDummyConnectionManager();
+
+		// result partitions
+		ResultPartition rp1 = createResultPartition(ResultPartitionType.PIPELINED, 2);
+		ResultPartition rp2 = createResultPartition(ResultPartitionType.BLOCKING, 2);
+		ResultPartition rp3 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 2);
+		ResultPartition rp4 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 4);
+		final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
+
+		// input gates
+		SingleInputGate ig1 = createSingleInputGate(ResultPartitionType.PIPELINED, 2);
+		SingleInputGate ig2 = createSingleInputGate(ResultPartitionType.BLOCKING, 2);
+		SingleInputGate ig3 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 2);
+		SingleInputGate ig4 = createSingleInputGate(ResultPartitionType.PIPELINED_BOUNDED, 4);
+		final SingleInputGate[] inputGates = new SingleInputGate[] {ig1, ig2, ig3, ig4};
+
+		// set up remote input channels for the exclusive buffers of the credit-based flow control
+		// (note that this does not obey the partition types which is ok for the scope of the test)
+		if (enableCreditBasedFlowControl) {
+			createRemoteInputChannel(ig4, 0, rp1, connManager);
+			createRemoteInputChannel(ig4, 0, rp2, connManager);
+			createRemoteInputChannel(ig4, 0, rp3, connManager);
+			createRemoteInputChannel(ig4, 0, rp4, connManager);
 
+			createRemoteInputChannel(ig1, 1, rp1, connManager);
+			createRemoteInputChannel(ig1, 1, rp4, connManager);
+
+			createRemoteInputChannel(ig2, 1, rp2, connManager);
+			createRemoteInputChannel(ig2, 2, rp4, connManager);
+
+			createRemoteInputChannel(ig3, 1, rp3, connManager);
+			createRemoteInputChannel(ig3, 3, rp4, connManager);
+		}
+
+		// overall task to register
+		Task task = mock(Task.class);
+		when(task.getProducedPartitions()).thenReturn(resultPartitions);
+		when(task.getAllInputGates()).thenReturn(inputGates);
+
+		network.registerTask(task);
+
+		// verify buffer pools for the result partitions
+		assertEquals(Integer.MAX_VALUE, rp1.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(Integer.MAX_VALUE, rp2.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(2 * 2 + 8, rp3.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(4 * 2 + 8, rp4.getBufferPool().getMaxNumberOfMemorySegments());
+
+		for (ResultPartition rp : resultPartitions) {
+			assertEquals(rp.getNumberOfSubpartitions(), rp.getBufferPool().getNumberOfRequiredMemorySegments());
+			assertEquals(rp.getNumberOfSubpartitions(), rp.getBufferPool().getNumBuffers());
+		}
+
+		// verify buffer pools for the input gates (NOTE: credit-based uses minimum required buffers
+		// for exclusive buffers not managed by the buffer pool)
+		assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig1.getBufferPool().getNumberOfRequiredMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig2.getBufferPool().getNumberOfRequiredMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? 0 : 2, ig3.getBufferPool().getNumberOfRequiredMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? 0 : 4, ig4.getBufferPool().getNumberOfRequiredMemorySegments());
+
+		assertEquals(Integer.MAX_VALUE, ig1.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(Integer.MAX_VALUE, ig2.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? 8 : 2 * 2 + 8, ig3.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(enableCreditBasedFlowControl ? 8 : 4 * 2 + 8, ig4.getBufferPool().getMaxNumberOfMemorySegments());
+
+		int invokations = enableCreditBasedFlowControl ? 1 : 0;
+		verify(ig1, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+		verify(ig2, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+		verify(ig3, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+		verify(ig4, times(invokations)).assignExclusiveSegments(network.getNetworkBufferPool(), 2);
+
+		for (ResultPartition rp : resultPartitions) {
+			rp.release();
+		}
+		for (SingleInputGate ig : inputGates) {
+			ig.releaseAllResources();
+		}
 		network.shutdown();
 	}
 
@@ -135,7 +319,7 @@ public class NetworkEnvironmentTest {
 	}
 
 	/**
-	 * Helper to create a mock of a {@link SingleInputGate} for use by a {@link Task} inside
+	 * Helper to create spy of a {@link SingleInputGate} for use by a {@link Task} inside
 	 * {@link NetworkEnvironment#registerTask(Task)}.
 	 *
 	 * @param partitionType
@@ -143,26 +327,35 @@ public class NetworkEnvironmentTest {
 	 * @param channels
 	 * 		the number of input channels
 	 *
-	 * @return mock with minimal functionality necessary by {@link NetworkEnvironment#registerTask(Task)}
+	 * @return input gate with some fake settings
 	 */
-	private static SingleInputGate createSingleInputGateMock(
+	private static SingleInputGate createSingleInputGate(
 			final ResultPartitionType partitionType, final int channels) {
-		SingleInputGate ig = mock(SingleInputGate.class);
-		when(ig.getConsumedPartitionType()).thenReturn(partitionType);
-		when(ig.getNumberOfInputChannels()).thenReturn(channels);
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(final InvocationOnMock invocation) throws Throwable {
-				BufferPool bp = invocation.getArgumentAt(0, BufferPool.class);
-				if (partitionType == ResultPartitionType.PIPELINED_BOUNDED) {
-					assertEquals(8, bp.getMaxNumberOfMemorySegments());
-				} else {
-					assertEquals(Integer.MAX_VALUE, bp.getMaxNumberOfMemorySegments());
-				}
-				return null;
-			}
-		}).when(ig).setBufferPool(any(BufferPool.class));
-
-		return ig;
+		return spy(new SingleInputGate(
+			"Test Task Name",
+			new JobID(),
+			new IntermediateDataSetID(),
+			partitionType,
+			0,
+			channels,
+			mock(TaskActions.class),
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()));
+	}
+
+	private static void createRemoteInputChannel(
+			SingleInputGate inputGate,
+			int channelIndex,
+			ResultPartition resultPartition,
+			ConnectionManager connManager) {
+		RemoteInputChannel channel = new RemoteInputChannel(
+			inputGate,
+			channelIndex,
+			resultPartition.getPartitionId(),
+			mock(ConnectionID.class),
+			connManager,
+			0,
+			0,
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
+		inputGate.setInputChannel(resultPartition.getPartitionId().getPartitionId(), channel);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c491400b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index 31effe1..f73ede7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -33,7 +33,7 @@ import static org.mockito.Mockito.when;
 /**
  * Some utility methods used for testing InputChannels and InputGates.
  */
-class InputChannelTestUtils {
+public class InputChannelTestUtils {
 
 	/**
 	 * Creates a result partition manager that ignores all IDs, and simply returns the given

http://git-wip-us.apache.org/repos/asf/flink/blob/c491400b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index b1613f2..3e6dcf6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -93,21 +93,45 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
 	protected ResultPartitionID[] partitionIds;
 
-	public void setUp(int writers, int channels, boolean localMode) throws Exception {
+	/**
+	 * Sets up the environment including buffer pools and netty threads.
+	 *
+	 * @param writers
+	 * 		number of writers
+	 * @param channels
+	 * 		outgoing channels per writer
+	 * @param localMode
+	 * 		only local channels?
+	 * @param senderBufferPoolSize
+	 * 		buffer pool size for the sender (set to <tt>-1</tt> for default)
+	 * @param receiverBufferPoolSize
+	 * 		buffer pool size for the receiver (set to <tt>-1</tt> for default)
+	 */
+	public void setUp(
+			int writers,
+			int channels,
+			boolean localMode,
+			int senderBufferPoolSize,
+			int receiverBufferPoolSize) throws Exception {
 		this.localMode = localMode;
 		this.channels = channels;
 		this.partitionIds = new ResultPartitionID[writers];
+		if (senderBufferPoolSize == -1) {
+			senderBufferPoolSize = Math.max(2048, writers * channels * 4);
+		}
+		if (receiverBufferPoolSize == -1) {
+			receiverBufferPoolSize = Math.max(2048, writers * channels * 4);
+		}
 
 		ioManager = new IOManagerAsync();
 
-		int bufferPoolSize = Math.max(2048, writers * channels * 4);
-		senderEnv = createNettyNetworkEnvironment(bufferPoolSize);
+		senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize);
 		senderEnv.start();
-		if (localMode) {
+		if (localMode && senderBufferPoolSize == receiverBufferPoolSize) {
 			receiverEnv = senderEnv;
 		}
 		else {
-			receiverEnv = createNettyNetworkEnvironment(bufferPoolSize);
+			receiverEnv = createNettyNetworkEnvironment(receiverBufferPoolSize);
 			receiverEnv.start();
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c491400b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
index 7aa218e..6b96c62 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkPointToPointBenchmark.java
@@ -70,7 +70,7 @@ public class StreamNetworkPointToPointBenchmark {
 	 */
 	public void setUp(long flushTimeout) throws Exception {
 		environment = new StreamNetworkBenchmarkEnvironment<>();
-		environment.setUp(1, 1, false);
+		environment.setUp(1, 1, false, -1, -1);
 
 		receiver = environment.createReceiver();
 		recordWriter = environment.createRecordWriter(0, flushTimeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/c491400b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
index fe08993..1b0ef8a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmark.java
@@ -59,6 +59,10 @@ public class StreamNetworkThroughputBenchmark {
 		setUp(recordWriters, channels, flushTimeout, false);
 	}
 
+	public void setUp(int recordWriters, int channels, int flushTimeout, boolean localMode) throws Exception {
+		setUp(recordWriters, channels, flushTimeout, false, -1, -1);
+	}
+
 	/**
 	 * Initializes the throughput benchmark with the given parameters.
 	 *
@@ -68,9 +72,15 @@ public class StreamNetworkThroughputBenchmark {
 	 * @param channels
 	 * 		number of outgoing channels / receivers
 	 */
-	public void setUp(int recordWriters, int channels, int flushTimeout, boolean localMode) throws Exception {
+	public void setUp(
+			int recordWriters,
+			int channels,
+			int flushTimeout,
+			boolean localMode,
+			int senderBufferPoolSize,
+			int receiverBufferPoolSize) throws Exception {
 		environment = new StreamNetworkBenchmarkEnvironment<>();
-		environment.setUp(recordWriters, channels, localMode);
+		environment.setUp(recordWriters, channels, localMode, senderBufferPoolSize, receiverBufferPoolSize);
 		receiver = environment.createReceiver();
 		writerThreads = new LongRecordWriterThread[recordWriters];
 		for (int writer = 0; writer < recordWriters; writer++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c491400b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
index ba8fe27..dac8ee2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTest.java
@@ -18,12 +18,21 @@
 
 package org.apache.flink.streaming.runtime.io.benchmark;
 
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
 
 /**
  * Tests for various network benchmarks based on {@link StreamNetworkThroughputBenchmark}.
  */
 public class StreamNetworkThroughputBenchmarkTest {
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
 	@Test
 	public void pointToPointBenchmark() throws Exception {
 		StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
@@ -61,6 +70,42 @@ public class StreamNetworkThroughputBenchmarkTest {
 	}
 
 	@Test
+	public void remoteModeInsufficientBuffersSender() throws Exception {
+		StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
+		int writers = 2;
+		int channels = 2;
+
+		expectedException.expect(IOException.class);
+		expectedException.expectMessage("Insufficient number of network buffers");
+
+		env.setUp(writers, channels, 100, false, writers * channels - 1, writers * channels * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue());
+	}
+
+	@Test
+	public void remoteModeInsufficientBuffersReceiver() throws Exception {
+		StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
+		int writers = 2;
+		int channels = 2;
+
+		expectedException.expect(IOException.class);
+		expectedException.expectMessage("Insufficient number of network buffers");
+
+		env.setUp(writers, channels, 100, false, writers * channels, writers * channels * TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue() - 1);
+	}
+
+	@Test
+	public void remoteModeMinimumBuffers() throws Exception {
+		StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
+		int writers = 2;
+		int channels = 2;
+
+		env.setUp(writers, channels, 100, false, writers * channels, writers * channels *
+			TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue());
+		env.executeBenchmark(10_000);
+		env.tearDown();
+	}
+
+	@Test
 	public void pointToMultiPointBenchmark() throws Exception {
 		StreamNetworkThroughputBenchmark benchmark = new StreamNetworkThroughputBenchmark();
 		benchmark.setUp(1, 100, 100);

http://git-wip-us.apache.org/repos/asf/flink/blob/c491400b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
index dbd0f79..16159c1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java
@@ -60,7 +60,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger {
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();
 		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L);
-		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024);
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
 		return config;
 	}