You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/10 13:42:48 UTC

[6/7] flink git commit: [FLINK-4545] [network] Allow LocalBufferPool to limited the number of used buffers

[FLINK-4545] [network] Allow LocalBufferPool to limited the number of used buffers

Use "a * <number of channels> + b" buffers for bounded pipelined partitions:

Default: a = 2, b = 8
* 1 buffer for in-flight data in the subpartition/input channel
* 1 buffer for parallel serialization
* + some extra buffers (8 seems a good default given bandwidth-delay products of current networks)

This closes #3480


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11e2aa6d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11e2aa6d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11e2aa6d

Branch: refs/heads/master
Commit: 11e2aa6dcdbf42992cda57a5b50d5c29b4facf2d
Parents: e5e6da0
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Feb 10 14:53:09 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 10 13:30:58 2017 +0100

----------------------------------------------------------------------
 .../flink/configuration/TaskManagerOptions.java |  14 ++
 .../BackPressureStatsTrackerITCase.java         |   2 +-
 .../InputGateDeploymentDescriptor.java          |  15 ++
 .../runtime/executiongraph/ExecutionVertex.java |   7 +-
 .../runtime/io/network/NetworkEnvironment.java  |  23 ++-
 .../runtime/io/network/buffer/BufferPool.java   |  10 +-
 .../io/network/buffer/BufferPoolFactory.java    |   7 +-
 .../io/network/buffer/LocalBufferPool.java      |  57 ++++++-
 .../io/network/buffer/NetworkBufferPool.java    |  64 +++++--
 .../netty/PartitionRequestServerHandler.java    |   2 +-
 .../io/network/partition/ResultPartition.java   |  10 ++
 .../network/partition/ResultPartitionType.java  |  32 +++-
 .../partition/consumer/SingleInputGate.java     |  18 +-
 .../taskexecutor/TaskManagerServices.java       |   4 +-
 .../TaskManagerServicesConfiguration.java       |  17 +-
 .../NetworkEnvironmentConfiguration.scala       |   2 +
 .../io/network/MockNetworkEnvironment.java      |   2 +-
 .../io/network/NetworkEnvironmentTest.java      | 165 ++++++++++++++++++
 .../io/network/api/writer/RecordWriterTest.java |   2 +-
 .../network/buffer/BufferPoolFactoryTest.java   | 170 ++++++++++++++++---
 .../io/network/buffer/LocalBufferPoolTest.java  |  58 +++++++
 .../network/buffer/NetworkBufferPoolTest.java   |  96 ++++++++---
 .../partition/InputGateConcurrentTest.java      |   6 +-
 .../partition/InputGateFairnessTest.java        |   3 +-
 .../consumer/LocalInputChannelTest.java         |   6 +-
 .../partition/consumer/SingleInputGateTest.java |  24 ++-
 .../partition/consumer/TestSingleInputGate.java |   2 +
 .../partition/consumer/UnionInputGateTest.java  |  15 +-
 ...askManagerComponentsStartupShutdownTest.java |   7 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  29 +++-
 .../api/graph/StreamingJobGraphGenerator.java   |   6 +-
 .../graph/StreamingJobGraphGeneratorTest.java   |  21 ++-
 .../io/BarrierBufferMassiveRandomTest.java      |   4 +-
 33 files changed, 790 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index b7ee20a..b891e35 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -53,6 +53,20 @@ public class TaskManagerOptions {
 			key("taskmanager.net.request-backoff.max")
 			.defaultValue(10000);
 
+	/**
+	 * Number of network buffers to use for each outgoing/ingoing channel (subpartition/input channel).
+	 *
+	 * Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization
+	 */
+	public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
+		key("taskmanager.net.memory.buffers-per-channel")
+			.defaultValue(2);
+
+	/** Number of extra network buffers to use for each outgoing/ingoing gate (result partition/input gate). */
+	public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
+		key("taskmanager.net.memory.extra-buffers-per-gate")
+			.defaultValue(8);
+
 	// ------------------------------------------------------------------------
 	//  Task Options
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 1943129..46f8be6 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -108,7 +108,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 			//
 			// 1) Consume all buffers at first (no buffers for the test task)
 			//
-			testBufferPool = networkBufferPool.createBufferPool(1);
+			testBufferPool = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
 			final List<Buffer> buffers = new ArrayList<>();
 			while (true) {
 				Buffer buffer = testBufferPool.requestBuffer();

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
index dde1ed7..9bf724a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.deployment;
 
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -46,6 +47,9 @@ public class InputGateDeploymentDescriptor implements Serializable {
 	 */
 	private final IntermediateDataSetID consumedResultId;
 
+	/** The type of the partition the input gate is going to consume. */
+	private final ResultPartitionType consumedPartitionType;
+
 	/**
 	 * The index of the consumed subpartition of each consumed partition. This index depends on the
 	 * {@link DistributionPattern} and the subtask indices of the producing and consuming task.
@@ -57,10 +61,12 @@ public class InputGateDeploymentDescriptor implements Serializable {
 
 	public InputGateDeploymentDescriptor(
 			IntermediateDataSetID consumedResultId,
+			ResultPartitionType consumedPartitionType,
 			int consumedSubpartitionIndex,
 			InputChannelDeploymentDescriptor[] inputChannels) {
 
 		this.consumedResultId = checkNotNull(consumedResultId);
+		this.consumedPartitionType = checkNotNull(consumedPartitionType);
 
 		checkArgument(consumedSubpartitionIndex >= 0);
 		this.consumedSubpartitionIndex = consumedSubpartitionIndex;
@@ -72,6 +78,15 @@ public class InputGateDeploymentDescriptor implements Serializable {
 		return consumedResultId;
 	}
 
+	/**
+	 * Returns the type of this input channel's consumed result partition.
+	 *
+	 * @return consumed result partition type
+	 */
+	public ResultPartitionType getConsumedPartitionType() {
+		return consumedPartitionType;
+	}
+
 	public int getConsumedSubpartitionIndex() {
 		return consumedSubpartitionIndex;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index ca8e07c..21af73a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -687,9 +688,11 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 			int queueToRequest = subTaskIndex % numConsumerEdges;
 
-			IntermediateDataSetID resultId = edges[0].getSource().getIntermediateResult().getId();
+			IntermediateResult consumedIntermediateResult = edges[0].getSource().getIntermediateResult();
+			final IntermediateDataSetID resultId = consumedIntermediateResult.getId();
+			final ResultPartitionType partitionType = consumedIntermediateResult.getResultType();
 
-			consumedPartitions.add(new InputGateDeploymentDescriptor(resultId, queueToRequest, partitions));
+			consumedPartitions.add(new InputGateDeploymentDescriptor(resultId, partitionType, queueToRequest, partitions));
 		}
 
 		SerializedValue<JobInformation> serializedJobInformation = getExecutionGraph().getSerializedJobInformation();

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 8e85ffe..4d4b305 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -73,6 +73,11 @@ public class NetworkEnvironment {
 
 	private final int partitionRequestMaxBackoff;
 
+	/** Number of network buffers to use for each outgoing/ingoing channel (subpartition/input channel). */
+	private final int networkBuffersPerChannel;
+	/** Number of extra network buffers to use for each outgoing/ingoing gate (result partition/input gate). */
+	private final int extraNetworkBuffersPerGate;
+
 	private boolean isShutdown;
 
 	public NetworkEnvironment(
@@ -84,7 +89,9 @@ public class NetworkEnvironment {
 			KvStateServer kvStateServer,
 			IOMode defaultIOMode,
 			int partitionRequestInitialBackoff,
-			int partitionRequestMaxBackoff) {
+			int partitionRequestMaxBackoff,
+			int networkBuffersPerChannel,
+			int extraNetworkBuffersPerGate) {
 
 		this.networkBufferPool = checkNotNull(networkBufferPool);
 		this.connectionManager = checkNotNull(connectionManager);
@@ -100,6 +107,8 @@ public class NetworkEnvironment {
 		this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
 
 		isShutdown = false;
+		this.networkBuffersPerChannel = networkBuffersPerChannel;
+		this.extraNetworkBuffersPerGate = extraNetworkBuffersPerGate;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -171,7 +180,11 @@ public class NetworkEnvironment {
 				BufferPool bufferPool = null;
 
 				try {
-					bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions());
+					int maxNumberOfMemorySegments = partition.getPartitionType().isBounded() ?
+						partition.getNumberOfSubpartitions() * networkBuffersPerChannel +
+							extraNetworkBuffersPerGate : Integer.MAX_VALUE;
+					bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(),
+							maxNumberOfMemorySegments);
 					partition.registerBufferPool(bufferPool);
 
 					resultPartitionManager.registerResultPartition(partition);
@@ -198,7 +211,11 @@ public class NetworkEnvironment {
 				BufferPool bufferPool = null;
 
 				try {
-					bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels());
+					int maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
+						gate.getNumberOfInputChannels() * networkBuffersPerChannel +
+							extraNetworkBuffersPerGate : Integer.MAX_VALUE;
+					bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(),
+						maxNumberOfMemorySegments);
 					gate.setBufferPool(bufferPool);
 				} catch (Throwable t) {
 					if (bufferPool != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
index 8784b14..a4928ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
@@ -50,6 +50,13 @@ public interface BufferPool extends BufferProvider, BufferRecycler {
 	int getNumberOfRequiredMemorySegments();
 
 	/**
+	 * Returns the maximum number of memory segments this buffer pool should use
+	 *
+	 * @return maximum number of memory segments to use or <tt>-1</tt> if unlimited
+	 */
+	int getMaxNumberOfMemorySegments();
+
+	/**
 	 * Returns the current size of this buffer pool.
 	 *
 	 * <p> The size of the buffer pool can change dynamically at runtime.
@@ -59,7 +66,7 @@ public interface BufferPool extends BufferProvider, BufferRecycler {
 	/**
 	 * Sets the current size of this buffer pool.
 	 *
-	 * <p> The size needs to be greater or equals to the guaranteed number of memory segments.
+	 * <p> The size needs to be greater or equal to the guaranteed number of memory segments.
 	 */
 	void setNumBuffers(int numBuffers) throws IOException;
 
@@ -72,5 +79,4 @@ public interface BufferPool extends BufferProvider, BufferRecycler {
 	 * Returns the number of used buffers of this buffer pool.
 	 */
 	int bestEffortGetNumOfUsedBuffers();
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
index e953158..ffed432 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
@@ -30,8 +30,13 @@ public interface BufferPoolFactory {
 	 * buffers.
 	 *
 	 * <p> The buffer pool is of dynamic size with at least <tt>numRequiredBuffers</tt> buffers.
+	 *
+	 * @param numRequiredBuffers
+	 * 		minimum number of network buffers in this pool
+	 * @param maxUsedBuffers
+	 * 		maximum number of network buffers this pool offers
 	 */
-	BufferPool createBufferPool(int numRequiredBuffers) throws IOException;
+	BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) throws IOException;
 
 	/**
 	 * Destroy callback for updating factory book keeping.

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index d6a4cf7..a587997 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.util.event.EventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -44,6 +46,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * match its new size.
  */
 class LocalBufferPool implements BufferPool {
+	private static final Logger LOG = LoggerFactory.getLogger(LocalBufferPool.class);
 
 	/** Global network buffer pool to get buffers from. */
 	private final NetworkBufferPool networkBufferPool;
@@ -63,6 +66,9 @@ class LocalBufferPool implements BufferPool {
 	 */
 	private final Queue<EventListener<Buffer>> registeredListeners = new ArrayDeque<EventListener<Buffer>>();
 
+	/** Maximum number of network buffers to allocate. */
+	private final int maxNumberOfMemorySegments;
+
 	/** The current size of this pool */
 	private int currentPoolSize;
 
@@ -86,9 +92,37 @@ class LocalBufferPool implements BufferPool {
 	 * 		minimum number of network buffers
 	 */
 	LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments) {
+		this(networkBufferPool, numberOfRequiredMemorySegments, Integer.MAX_VALUE);
+	}
+
+	/**
+	 * Local buffer pool based on the given <tt>networkBufferPool</tt> with a minimal and maximal
+	 * number of network buffers being available.
+	 *
+	 * @param networkBufferPool
+	 * 		global network buffer pool to get buffers from
+	 * @param numberOfRequiredMemorySegments
+	 * 		minimum number of network buffers
+	 * @param maxNumberOfMemorySegments
+	 * 		maximum number of network buffers to allocate
+	 */
+	LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments,
+			int maxNumberOfMemorySegments) {
+		checkArgument(maxNumberOfMemorySegments >= numberOfRequiredMemorySegments,
+			"Maximum number of memory segments (%s) should not be smaller than minimum (%s).",
+			maxNumberOfMemorySegments, numberOfRequiredMemorySegments);
+
+		checkArgument(maxNumberOfMemorySegments > 0,
+			"Maximum number of memory segments (%s) should be larger than 0.",
+			maxNumberOfMemorySegments);
+
+		LOG.debug("Using a local buffer pool with {}-{} buffers",
+			numberOfRequiredMemorySegments, maxNumberOfMemorySegments);
+
 		this.networkBufferPool = networkBufferPool;
 		this.numberOfRequiredMemorySegments = numberOfRequiredMemorySegments;
 		this.currentPoolSize = numberOfRequiredMemorySegments;
+		this.maxNumberOfMemorySegments = maxNumberOfMemorySegments;
 	}
 
 	// ------------------------------------------------------------------------
@@ -113,6 +147,11 @@ class LocalBufferPool implements BufferPool {
 	}
 
 	@Override
+	public int getMaxNumberOfMemorySegments() {
+		return maxNumberOfMemorySegments;
+	}
+
+	@Override
 	public int getNumberOfAvailableMemorySegments() {
 		synchronized (availableMemorySegments) {
 			return availableMemorySegments.size();
@@ -160,6 +199,7 @@ class LocalBufferPool implements BufferPool {
 
 			boolean askToRecycle = owner != null;
 
+			// fill availableMemorySegments with at least one element, wait if required
 			while (availableMemorySegments.isEmpty()) {
 				if (isDestroyed) {
 					throw new IllegalStateException("Buffer pool is destroyed.");
@@ -257,9 +297,14 @@ class LocalBufferPool implements BufferPool {
 	@Override
 	public void setNumBuffers(int numBuffers) throws IOException {
 		synchronized (availableMemorySegments) {
-			checkArgument(numBuffers >= numberOfRequiredMemorySegments, "Buffer pool needs at least " + numberOfRequiredMemorySegments + " buffers, but tried to set to " + numBuffers + ".");
+			checkArgument(numBuffers >= numberOfRequiredMemorySegments, "Buffer pool needs at least " +
+				numberOfRequiredMemorySegments + " buffers, but tried to set to " + numBuffers + ".");
 
-			currentPoolSize = numBuffers;
+			if (numBuffers > maxNumberOfMemorySegments) {
+				currentPoolSize = maxNumberOfMemorySegments;
+			} else {
+				currentPoolSize = numBuffers;
+			}
 
 			returnExcessMemorySegments();
 
@@ -274,7 +319,10 @@ class LocalBufferPool implements BufferPool {
 	@Override
 	public String toString() {
 		synchronized (availableMemorySegments) {
-			return String.format("[size: %d, required: %d, requested: %d, available: %d, listeners: %d, destroyed: %s]", currentPoolSize, numberOfRequiredMemorySegments, numberOfRequestedMemorySegments, availableMemorySegments.size(), registeredListeners.size(), isDestroyed);
+			return String.format(
+				"[size: %d, required: %d, requested: %d, available: %d, max: %d, listeners: %d, destroyed: %s]",
+				currentPoolSize, numberOfRequiredMemorySegments, numberOfRequestedMemorySegments,
+				availableMemorySegments.size(), maxNumberOfMemorySegments, registeredListeners.size(), isDestroyed);
 		}
 	}
 
@@ -296,8 +344,7 @@ class LocalBufferPool implements BufferPool {
 				return;
 			}
 
-			networkBufferPool.recycle(segment);
-			numberOfRequestedMemorySegments--;
+			returnMemorySegment(segment);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 5345fbb..7668759 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -180,7 +180,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public BufferPool createBufferPool(int numRequiredBuffers) throws IOException {
+	public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) throws IOException {
 		// It is necessary to use a separate lock from the one used for buffer
 		// requests to ensure deadlock freedom for failure cases.
 		synchronized (factoryLock) {
@@ -205,7 +205,8 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 			// We are good to go, create a new buffer pool and redistribute
 			// non-fixed size buffers.
-			LocalBufferPool localBufferPool = new LocalBufferPool(this, numRequiredBuffers);
+			LocalBufferPool localBufferPool =
+				new LocalBufferPool(this, numRequiredBuffers, maxUsedBuffers);
 
 			allBufferPools.add(localBufferPool);
 
@@ -236,7 +237,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 	/**
 	 * Destroys all buffer pools that allocate their buffers from this
-	 * buffer pool (created via {@link #createBufferPool(int)}).
+	 * buffer pool (created via {@link #createBufferPool(int, int)}).
 	 */
 	public void destroyAllBufferPools() {
 		synchronized (factoryLock) {
@@ -258,27 +259,60 @@ public class NetworkBufferPool implements BufferPoolFactory {
 	private void redistributeBuffers() throws IOException {
 		assert Thread.holdsLock(factoryLock);
 
-		int numManagedBufferPools = allBufferPools.size();
+		// All buffers, which are not among the required ones
+		int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
 
-		if (numManagedBufferPools == 0) {
-			return; // necessary to avoid div by zero when no managed pools
+		if (numAvailableMemorySegment == 0) {
+			// in this case, we need to redistribute buffers so that every pool gets its minimum
+			for (LocalBufferPool bufferPool : allBufferPools) {
+				bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
+			}
+			return;
 		}
 
-		// All buffers, which are not among the required ones
-		int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
+		/**
+		 * With buffer pools being potentially limited, let's distribute the available memory
+		 * segments based on the capacity of each buffer pool, i.e. the maximum number of segments
+		 * an unlimited buffer pool can take is numAvailableMemorySegment, for limited buffer pools
+		 * it may be less. Based on this and the sum of all these values (totalCapacity), we build
+		 * a ratio that we use to distribute the buffers.
+		 */
 
-		// Available excess (not required) buffers per pool
-		int numExcessBuffersPerPool = numAvailableMemorySegment / numManagedBufferPools;
+		int totalCapacity = 0;
+		for (LocalBufferPool bufferPool : allBufferPools) {
+			int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
+				bufferPool.getNumberOfRequiredMemorySegments();
+			totalCapacity += Math.min(numAvailableMemorySegment, excessMax);
+		}
 
-		// Distribute leftover buffers in round robin fashion
-		int numLeftoverBuffers = numAvailableMemorySegment % numManagedBufferPools;
+		// no capacity to receive additional buffers?
+		if (totalCapacity == 0) {
+			return; // necessary to avoid div by zero when nothing to re-distribute
+		}
 
-		int bufferPoolIndex = 0;
+		int memorySegmentsToDistribute = Math.min(numAvailableMemorySegment, totalCapacity);
 
+		int totalPartsUsed = 0; // of totalCapacity
+		int numDistributedMemorySegment = 0;
 		for (LocalBufferPool bufferPool : allBufferPools) {
-			int leftoverBuffers = bufferPoolIndex++ < numLeftoverBuffers ? 1 : 0;
+			int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
+				bufferPool.getNumberOfRequiredMemorySegments();
+
+			// shortcut
+			if (excessMax == 0) {
+				continue;
+			}
+
+			totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);
 
-			bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + numExcessBuffersPerPool + leftoverBuffers);
+			// avoid remaining buffers by looking at the total capacity that should have been
+			// re-distributed up until here
+			int mySize = memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment;
+			numDistributedMemorySegment += mySize;
+			bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
 		}
+
+		assert (totalPartsUsed == totalCapacity);
+		assert (numDistributedMemorySegment == memorySegmentsToDistribute);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index 36c1234..6f56877 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -67,7 +67,7 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
 	public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
 		super.channelRegistered(ctx);
 
-		bufferPool = networkBufferPool.createBufferPool(1);
+		bufferPool = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 3d92584..eb1418b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -160,6 +160,7 @@ public class ResultPartition implements BufferPoolOwner {
 				break;
 
 			case PIPELINED:
+			case PIPELINED_BOUNDED:
 				for (int i = 0; i < subpartitions.length; i++) {
 					subpartitions[i] = new PipelinedSubpartition(i, this);
 				}
@@ -237,6 +238,15 @@ public class ResultPartition implements BufferPoolOwner {
 		return totalBuffers;
 	}
 
+	/**
+	 * Returns the type of this result partition.
+	 *
+	 * @return result partition type
+	 */
+	public ResultPartitionType getPartitionType() {
+		return partitionType;
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
index 43d3a52..256387c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -20,9 +20,22 @@ package org.apache.flink.runtime.io.network.partition;
 
 public enum ResultPartitionType {
 
-	BLOCKING(false, false),
+	BLOCKING(false, false, false),
 
-	PIPELINED(true, true);
+	PIPELINED(true, true, false),
+
+	/**
+	 * Pipelined partitions with a bounded (local) buffer pool.
+	 *
+	 * For streaming jobs, a fixed limit on the buffer pool size should help avoid that too much
+	 * data is being buffered and checkpoint barriers are delayed. In contrast to limiting the
+	 * overall network buffer pool size, this, however, still allows to be flexible with regards
+	 * to the total number of partitions by selecting an appropriately big network buffer pool size.
+	 *
+	 * For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are
+	 * no checkpoint barriers.
+	 */
+	PIPELINED_BOUNDED(true, true, true);
 
 	/** Can the partition be consumed while being produced? */
 	private final boolean isPipelined;
@@ -30,12 +43,16 @@ public enum ResultPartitionType {
 	/** Does the partition produce back pressure when not consumed? */
 	private final boolean hasBackPressure;
 
+	/** Does this partition use a limited number of (network) buffers? */
+	private final boolean isBounded;
+
 	/**
 	 * Specifies the behaviour of an intermediate result partition at runtime.
 	 */
-	ResultPartitionType(boolean isPipelined, boolean hasBackPressure) {
+	ResultPartitionType(boolean isPipelined, boolean hasBackPressure, boolean isBounded) {
 		this.isPipelined = isPipelined;
 		this.hasBackPressure = hasBackPressure;
+		this.isBounded = isBounded;
 	}
 
 	public boolean hasBackPressure() {
@@ -49,4 +66,13 @@ public enum ResultPartitionType {
 	public boolean isPipelined() {
 		return isPipelined;
 	}
+
+	/**
+	 * Whether this partition uses a limited number of (network) buffers or not.
+	 *
+	 * @return <tt>true</tt> if the number of buffers should be bound to some limit
+	 */
+	public boolean isBounded() {
+		return isBounded;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index d546559..afe8722 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -115,6 +116,9 @@ public class SingleInputGate implements InputGate {
 	 */
 	private final IntermediateDataSetID consumedResultId;
 
+	/** The type of the partition the input gate is consuming. */
+	private final ResultPartitionType consumedPartitionType;
+
 	/**
 	 * The index of the consumed subpartition of each consumed partition. This index depends on the
 	 * {@link DistributionPattern} and the subtask indices of the producing and consuming task.
@@ -166,6 +170,7 @@ public class SingleInputGate implements InputGate {
 		String owningTaskName,
 		JobID jobId,
 		IntermediateDataSetID consumedResultId,
+		final ResultPartitionType consumedPartitionType,
 		int consumedSubpartitionIndex,
 		int numberOfInputChannels,
 		TaskActions taskActions,
@@ -175,6 +180,7 @@ public class SingleInputGate implements InputGate {
 		this.jobId = checkNotNull(jobId);
 
 		this.consumedResultId = checkNotNull(consumedResultId);
+		this.consumedPartitionType = checkNotNull(consumedPartitionType);
 
 		checkArgument(consumedSubpartitionIndex >= 0);
 		this.consumedSubpartitionIndex = consumedSubpartitionIndex;
@@ -201,6 +207,15 @@ public class SingleInputGate implements InputGate {
 		return consumedResultId;
 	}
 
+	/**
+	 * Returns the type of this input channel's consumed result partition.
+	 *
+	 * @return consumed result partition type
+	 */
+	public ResultPartitionType getConsumedPartitionType() {
+		return consumedPartitionType;
+	}
+
 	BufferProvider getBufferProvider() {
 		return bufferPool;
 	}
@@ -571,6 +586,7 @@ public class SingleInputGate implements InputGate {
 		TaskIOMetricGroup metrics) {
 
 		final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId());
+		final ResultPartitionType consumedPartitionType = checkNotNull(igdd.getConsumedPartitionType());
 
 		final int consumedSubpartitionIndex = igdd.getConsumedSubpartitionIndex();
 		checkArgument(consumedSubpartitionIndex >= 0);
@@ -578,7 +594,7 @@ public class SingleInputGate implements InputGate {
 		final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());
 
 		final SingleInputGate inputGate = new SingleInputGate(
-			owningTaskName, jobId, consumedResultId, consumedSubpartitionIndex,
+			owningTaskName, jobId, consumedResultId, consumedPartitionType, consumedSubpartitionIndex,
 			icdd.length, taskActions, metrics);
 
 		// Create the input channels. There is one input channel for each consumed partition.

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 19c5c01..e3c8345 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -370,7 +370,9 @@ public class TaskManagerServices {
 			kvStateServer,
 			networkEnvironmentConfiguration.ioMode(),
 			networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
-			networkEnvironmentConfiguration.partitionRequestMaxBackoff());
+			networkEnvironmentConfiguration.partitionRequestMaxBackoff(),
+			networkEnvironmentConfiguration.networkBuffersPerChannel(),
+			networkEnvironmentConfiguration.extraNetworkBuffersPerGate());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 2c76372..8ad318a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.HeapMemorySegment;
 import org.apache.flink.core.memory.HybridMemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
@@ -311,13 +312,25 @@ public class TaskManagerServicesConfiguration {
 			ioMode = IOManager.IOMode.SYNC;
 		}
 
+		int initialRequestBackoff = configuration.getInteger(
+			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+		int maxRequestBackoff = configuration.getInteger(
+			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+		int buffersPerChannel = configuration.getInteger(
+			TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+		int extraBuffersPerGate = configuration.getInteger(
+			TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
 		return new NetworkEnvironmentConfiguration(
 			numNetworkBuffers,
 			pageSize,
 			memType,
 			ioMode,
-			500,
-			3000,
+			initialRequestBackoff,
+			maxRequestBackoff,
+			buffersPerChannel,
+			extraBuffersPerGate,
 			nettyConfig);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 6c7ca3e..4ecfe59 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -29,4 +29,6 @@ case class NetworkEnvironmentConfiguration(
     ioMode: IOMode,
     partitionRequestInitialBackoff : Int,
     partitionRequestMaxBackoff : Int,
+    networkBuffersPerChannel: Int,
+    extraNetworkBuffersPerGate: Int,
     nettyConfig: NettyConfig = null)

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
index dcdf44c..3bbe6d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
@@ -36,7 +36,7 @@ public class MockNetworkEnvironment {
 
 	static {
 		try {
-			when(networkBufferPool.createBufferPool(anyInt())).thenReturn(mock(BufferPool.class));
+			when(networkBufferPool.createBufferPool(anyInt(), anyInt())).thenReturn(mock(BufferPool.class));
 			when(networkEnvironment.getNetworkBufferPool()).thenReturn(networkBufferPool);
 
 			when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
new file mode 100644
index 0000000..b956691
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Various tests for the {@link NetworkEnvironment} class.
+ */
+public class NetworkEnvironmentTest {
+	private final static int numBuffers = 1024;
+
+	private final static int memorySegmentSize = 128;
+
+	/**
+	 * Verifies that {@link NetworkEnvironment#registerTask(Task)} sets up (un)bounded buffer pool
+	 * instances for various types of input and output channels.
+	 */
+	@Test
+	public void testRegisterTaskUsesBoundedBuffers() throws Exception {
+
+		final NetworkEnvironment network = new NetworkEnvironment(
+			new NetworkBufferPool(numBuffers, memorySegmentSize, MemoryType.HEAP),
+			new LocalConnectionManager(),
+			new ResultPartitionManager(),
+			new TaskEventDispatcher(),
+			new KvStateRegistry(),
+			null,
+			IOManager.IOMode.SYNC,
+			0,
+			0,
+			2,
+			8);
+
+		// result partitions
+		ResultPartition rp1 = createResultPartition(ResultPartitionType.PIPELINED, 2);
+		ResultPartition rp2 = createResultPartition(ResultPartitionType.BLOCKING, 2);
+		ResultPartition rp3 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 2);
+		ResultPartition rp4 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 8);
+		final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
+		final ResultPartitionWriter[] resultPartitionWriters = new ResultPartitionWriter[] {
+			new ResultPartitionWriter(rp1), new ResultPartitionWriter(rp2),
+			new ResultPartitionWriter(rp3), new ResultPartitionWriter(rp4)};
+
+		// input gates
+		final SingleInputGate[] inputGates = new SingleInputGate[] {
+			createSingleInputGateMock(ResultPartitionType.PIPELINED, 2),
+			createSingleInputGateMock(ResultPartitionType.BLOCKING, 2),
+			createSingleInputGateMock(ResultPartitionType.PIPELINED_BOUNDED, 2),
+			createSingleInputGateMock(ResultPartitionType.PIPELINED_BOUNDED, 8)};
+
+		// overall task to register
+		Task task = mock(Task.class);
+		when(task.getProducedPartitions()).thenReturn(resultPartitions);
+		when(task.getAllWriters()).thenReturn(resultPartitionWriters);
+		when(task.getAllInputGates()).thenReturn(inputGates);
+
+		network.registerTask(task);
+
+		assertEquals(Integer.MAX_VALUE, rp1.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(Integer.MAX_VALUE, rp2.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(2 * 2 + 8, rp3.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(8 * 2 + 8, rp4.getBufferPool().getMaxNumberOfMemorySegments());
+
+		network.shutdown();
+	}
+
+	/**
+	 * Helper to create simple {@link ResultPartition} instance for use by a {@link Task} inside
+	 * {@link NetworkEnvironment#registerTask(Task)}.
+	 *
+	 * @param partitionType
+	 * 		the produced partition type
+	 * @param channels
+	 * 		the nummer of output channels
+	 *
+	 * @return instance with minimal data set and some mocks so that it is useful for {@link
+	 * NetworkEnvironment#registerTask(Task)}
+	 */
+	private static ResultPartition createResultPartition(
+			final ResultPartitionType partitionType, final int channels) {
+		return new ResultPartition(
+			"TestTask-" + partitionType + ":" + channels,
+			mock(TaskActions.class),
+			new JobID(),
+			new ResultPartitionID(),
+			partitionType,
+			channels,
+			channels,
+			mock(ResultPartitionManager.class),
+			mock(ResultPartitionConsumableNotifier.class),
+			mock(IOManager.class),
+			false);
+	}
+
+	/**
+	 * Helper to create a mock of a {@link SingleInputGate} for use by a {@link Task} inside
+	 * {@link NetworkEnvironment#registerTask(Task)}.
+	 *
+	 * @param partitionType
+	 * 		the consumed partition type
+	 * @param channels
+	 * 		the nummer of input channels
+	 *
+	 * @return mock with minimal functionality necessary by {@link NetworkEnvironment#registerTask(Task)}
+	 */
+	private static SingleInputGate createSingleInputGateMock(
+			final ResultPartitionType partitionType, final int channels) {
+		SingleInputGate ig = mock(SingleInputGate.class);
+		when(ig.getConsumedPartitionType()).thenReturn(partitionType);
+		when(ig.getNumberOfInputChannels()).thenReturn(channels);
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(final InvocationOnMock invocation) throws Throwable {
+				BufferPool bp = invocation.getArgumentAt(0, BufferPool.class);
+				if (partitionType == ResultPartitionType.PIPELINED_BOUNDED) {
+					assertEquals(channels * 2 + 8, bp.getMaxNumberOfMemorySegments());
+				} else {
+					assertEquals(Integer.MAX_VALUE, bp.getMaxNumberOfMemorySegments());
+				}
+				return null;
+			}
+		}).when(ig).setBufferPool(any(BufferPool.class));
+		return ig;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index ca1d0a5..7d83fb5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -177,7 +177,7 @@ public class RecordWriterTest {
 
 		try {
 			buffers = new NetworkBufferPool(1, 1024, MemoryType.HEAP);
-			bufferPool = spy(buffers.createBufferPool(1));
+			bufferPool = spy(buffers.createBufferPool(1, Integer.MAX_VALUE));
 
 			ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class);
 			when(partitionWriter.getBufferProvider()).thenReturn(checkNotNull(bufferPool));

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
index 49808c9..ce76a6d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 
 public class BufferPoolFactoryTest {
@@ -48,61 +49,190 @@ public class BufferPoolFactoryTest {
 	public void verifyAllBuffersReturned() {
 		String msg = "Did not return all buffers to network buffer pool after test.";
 		assertEquals(msg, numBuffers, networkBufferPool.getNumberOfAvailableMemorySegments());
+		// in case buffers have actually been requested, we must release them again
+		networkBufferPool.destroy();
 	}
 
 	@Test(expected = IOException.class)
 	public void testRequireMoreThanPossible() throws IOException {
-		networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2);
+		networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2, Integer.MAX_VALUE);
+	}
+
+	@Test
+	public void testBoundedPools() throws IOException {
+		BufferPool lbp = networkBufferPool.createBufferPool(1, 1);
+		assertEquals(1, lbp.getNumBuffers());
+
+		lbp = networkBufferPool.createBufferPool(1, 2);
+		assertEquals(2, lbp.getNumBuffers());
 	}
 
 	@Test
 	public void testSingleManagedPoolGetsAll() throws IOException {
-		BufferPool lbp = networkBufferPool.createBufferPool(1);
+		BufferPool lbp = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
 
 		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), lbp.getNumBuffers());
 	}
 
 	@Test
+	public void testSingleManagedPoolGetsAllExceptFixedOnes() throws IOException {
+		BufferPool fixed = networkBufferPool.createBufferPool(24, 24);
+
+		BufferPool lbp = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
+
+		assertEquals(24, fixed.getNumBuffers());
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() - fixed.getNumBuffers(), lbp.getNumBuffers());
+	}
+
+	@Test
 	public void testUniformDistribution() throws IOException {
-		BufferPool first = networkBufferPool.createBufferPool(0);
-		BufferPool second = networkBufferPool.createBufferPool(0);
+		BufferPool first = networkBufferPool.createBufferPool(0, Integer.MAX_VALUE);
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), first.getNumBuffers());
 
+		BufferPool second = networkBufferPool.createBufferPool(0, Integer.MAX_VALUE);
 		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, first.getNumBuffers());
 		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, second.getNumBuffers());
 	}
 
+	/**
+	 * Tests that buffers, once given to an initial buffer pool, get re-distributed to a second one
+	 * in case both buffer pools request half of the available buffer count.
+	 */
 	@Test
-	public void testAllDistributed() {
-		Random random = new Random();
+	public void testUniformDistributionAllBuffers() throws IOException {
+		BufferPool first = networkBufferPool
+			.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() / 2, Integer.MAX_VALUE);
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), first.getNumBuffers());
 
-		try {
-			List<BufferPool> pools = new ArrayList<BufferPool>();
+		BufferPool second = networkBufferPool
+			.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() / 2, Integer.MAX_VALUE);
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, first.getNumBuffers());
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2,
+			second.getNumBuffers());
+	}
 
-			int numPools = numBuffers / 32;
-			for (int i = 0; i < numPools; i++) {
-				pools.add(networkBufferPool.createBufferPool(random.nextInt(7 + 1)));
+	@Test
+	public void testUniformDistributionBounded1() throws IOException {
+		BufferPool first = networkBufferPool.createBufferPool(0, networkBufferPool.getTotalNumberOfMemorySegments());
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), first.getNumBuffers());
+
+		BufferPool second = networkBufferPool.createBufferPool(0, networkBufferPool.getTotalNumberOfMemorySegments());
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, first.getNumBuffers());
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, second.getNumBuffers());
+	}
+
+	@Test
+	public void testUniformDistributionBounded2() throws IOException {
+		BufferPool first = networkBufferPool.createBufferPool(0, 10);
+		assertEquals(10, first.getNumBuffers());
+
+		BufferPool second = networkBufferPool.createBufferPool(0, 10);
+		assertEquals(10, first.getNumBuffers());
+		assertEquals(10, second.getNumBuffers());
+	}
+
+	@Test
+	public void testUniformDistributionBounded3() throws IOException {
+		NetworkBufferPool globalPool = new NetworkBufferPool(3, 128, MemoryType.HEAP);
+		BufferPool first = globalPool.createBufferPool(0, 10);
+		assertEquals(3, first.getNumBuffers());
+
+		BufferPool second = globalPool.createBufferPool(0, 10);
+		// the order of which buffer pool received 2 or 1 buffer is undefined
+		assertEquals(3, first.getNumBuffers() + second.getNumBuffers());
+		assertNotEquals(3, first.getNumBuffers());
+		assertNotEquals(3, second.getNumBuffers());
+
+		BufferPool third = globalPool.createBufferPool(0, 10);
+		assertEquals(1, first.getNumBuffers());
+		assertEquals(1, second.getNumBuffers());
+		assertEquals(1, third.getNumBuffers());
+
+		// similar to #verifyAllBuffersReturned()
+		String msg = "Did not return all buffers to network buffer pool after test.";
+		assertEquals(msg, 3, globalPool.getNumberOfAvailableMemorySegments());
+		// in case buffers have actually been requested, we must release them again
+		globalPool.destroy();
+	}
+
+	@Test
+	public void testBufferRedistributionMixed1() throws IOException {
+		// try multiple times for various orders during redistribution
+		for (int i = 0; i < 1_000; ++i) {
+			BufferPool first = networkBufferPool.createBufferPool(0, 10);
+			assertEquals(10, first.getNumBuffers());
+
+			BufferPool second = networkBufferPool.createBufferPool(0, 10);
+			assertEquals(10, first.getNumBuffers());
+			assertEquals(10, second.getNumBuffers());
+
+			BufferPool third = networkBufferPool.createBufferPool(0, Integer.MAX_VALUE);
+			// note: exact buffer distribution depends on the order during the redistribution
+			for (BufferPool bp : new BufferPool[] {first, second, third}) {
+				int size = networkBufferPool.getTotalNumberOfMemorySegments() *
+					Math.min(networkBufferPool.getTotalNumberOfMemorySegments(),
+						bp.getMaxNumberOfMemorySegments()) /
+					(networkBufferPool.getTotalNumberOfMemorySegments() + 20);
+				if (bp.getNumBuffers() != size && bp.getNumBuffers() != (size + 1)) {
+					fail("wrong buffer pool size after redistribution: " + bp.getNumBuffers());
+				}
 			}
 
-			int numDistributedBuffers = 0;
-			for (BufferPool pool : pools) {
-				numDistributedBuffers += pool.getNumBuffers();
+			BufferPool fourth = networkBufferPool.createBufferPool(0, Integer.MAX_VALUE);
+			// note: exact buffer distribution depends on the order during the redistribution
+			for (BufferPool bp : new BufferPool[] {first, second, third, fourth}) {
+				int size = networkBufferPool.getTotalNumberOfMemorySegments() *
+					Math.min(networkBufferPool.getTotalNumberOfMemorySegments(),
+						bp.getMaxNumberOfMemorySegments()) /
+					(2 * networkBufferPool.getTotalNumberOfMemorySegments() + 20);
+				if (bp.getNumBuffers() != size && bp.getNumBuffers() != (size + 1)) {
+					fail("wrong buffer pool size after redistribution: " + bp.getNumBuffers());
+				}
 			}
 
-			assertEquals(numBuffers, numDistributedBuffers);
+			verifyAllBuffersReturned();
+			setupNetworkBufferPool();
 		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail(t.getMessage());
+	}
+
+	@Test
+	public void testAllDistributed() throws IOException {
+		// try multiple times for various orders during redistribution
+		for (int i = 0; i < 1_000; ++i) {
+			Random random = new Random();
+
+			List<BufferPool> pools = new ArrayList<BufferPool>();
+
+			int numPools = numBuffers / 32;
+			long maxTotalUsed = 0;
+			for (int j = 0; j < numPools; j++) {
+				int numRequiredBuffers = random.nextInt(7 + 1);
+				// make unbounded buffers more likely:
+				int maxUsedBuffers = random.nextBoolean() ? Integer.MAX_VALUE :
+					Math.max(1, random.nextInt(10) + numRequiredBuffers);
+				pools.add(networkBufferPool.createBufferPool(numRequiredBuffers, maxUsedBuffers));
+				maxTotalUsed = Math.min(numBuffers, maxTotalUsed + maxUsedBuffers);
+
+				// after every iteration, all buffers (up to maxTotalUsed) must be distributed
+				int numDistributedBuffers = 0;
+				for (BufferPool pool : pools) {
+					numDistributedBuffers += pool.getNumBuffers();
+				}
+				assertEquals(maxTotalUsed, numDistributedBuffers);
+			}
+
+			verifyAllBuffersReturned();
+			setupNetworkBufferPool();
 		}
 	}
 
 	@Test
 	public void testCreateDestroy() throws IOException {
-		BufferPool first = networkBufferPool.createBufferPool(0);
+		BufferPool first = networkBufferPool.createBufferPool(0, Integer.MAX_VALUE);
 
 		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), first.getNumBuffers());
 
-		BufferPool second = networkBufferPool.createBufferPool(0);
+		BufferPool second = networkBufferPool.createBufferPool(0, Integer.MAX_VALUE);
 
 		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, first.getNumBuffers());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 93731e1..a186d56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -328,6 +328,64 @@ public class LocalBufferPoolTest {
 		}
 	}
 
+	@Test
+	public void testBoundedBuffer() throws Exception {
+		localBufferPool.lazyDestroy();
+
+		localBufferPool = new LocalBufferPool(networkBufferPool, 1, 2);
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertEquals(2, localBufferPool.getMaxNumberOfMemorySegments());
+
+		Buffer buffer1, buffer2;
+
+		// check min number of buffers:
+		localBufferPool.setNumBuffers(1);
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNotNull(buffer1 = localBufferPool.requestBuffer());
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNull(localBufferPool.requestBuffer());
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		buffer1.recycle();
+		assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+
+		// check max number of buffers:
+		localBufferPool.setNumBuffers(2);
+		assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNotNull(buffer1 = localBufferPool.requestBuffer());
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNotNull(buffer2 = localBufferPool.requestBuffer());
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNull(localBufferPool.requestBuffer());
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		buffer1.recycle();
+		assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+		buffer2.recycle();
+		assertEquals(2, localBufferPool.getNumberOfAvailableMemorySegments());
+
+		// try to set too large buffer size:
+		localBufferPool.setNumBuffers(3);
+		assertEquals(2, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNotNull(buffer1 = localBufferPool.requestBuffer());
+		assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNotNull(buffer2 = localBufferPool.requestBuffer());
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNull(localBufferPool.requestBuffer());
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		buffer1.recycle();
+		assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+		buffer2.recycle();
+		assertEquals(2, localBufferPool.getNumberOfAvailableMemorySegments());
+
+		// decrease size again
+		localBufferPool.setNumBuffers(1);
+		assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNotNull(buffer1 = localBufferPool.requestBuffer());
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNull(localBufferPool.requestBuffer());
+		buffer1.recycle();
+		assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+	}
+
 	// ------------------------------------------------------------------------
 	// Helpers
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index ab32685..7c6a543 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.io.network.buffer;
 import org.apache.flink.core.memory.MemoryType;
 import org.junit.Test;
 
+import java.util.ArrayList;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -47,7 +49,23 @@ public class NetworkBufferPoolTest {
 			assertTrue(globalPool.isDestroyed());
 
 			try {
-				globalPool.createBufferPool(2);
+				globalPool.createBufferPool(2, 2);
+				fail("Should throw an IllegalStateException");
+			}
+			catch (IllegalStateException e) {
+				// yippie!
+			}
+
+			try {
+				globalPool.createBufferPool(2, 10);
+				fail("Should throw an IllegalStateException");
+			}
+			catch (IllegalStateException e) {
+				// yippie!
+			}
+
+			try {
+				globalPool.createBufferPool(2, Integer.MAX_VALUE);
 				fail("Should throw an IllegalStateException");
 			}
 			catch (IllegalStateException e) {
@@ -60,39 +78,51 @@ public class NetworkBufferPoolTest {
 		}
 
 	}
+
 	@Test
 	public void testDestroyAll() {
 		try {
-			NetworkBufferPool globalPool = new NetworkBufferPool(8, 128, MemoryType.HEAP);
-
-			BufferPool lbp = globalPool.createBufferPool(5);
-
-			assertEquals(5, lbp.getNumberOfRequiredMemorySegments());
-
-			Buffer[] buffers = {
-					lbp.requestBuffer(),
-					lbp.requestBuffer(),
-					lbp.requestBuffer(),
-					lbp.requestBuffer(),
-					lbp.requestBuffer(),
-					lbp.requestBuffer(),
-					lbp.requestBuffer(),
-					lbp.requestBuffer()
-			};
-
-			for (Buffer b : buffers) {
-				assertNotNull(b);
-				assertNotNull(b.getMemorySegment());
+			NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP);
+
+			BufferPool fixedPool = globalPool.createBufferPool(2, 2);
+			BufferPool boundedPool = globalPool.createBufferPool(0, 1);
+			BufferPool nonFixedPool = globalPool.createBufferPool(5, Integer.MAX_VALUE);
+
+			assertEquals(2, fixedPool.getNumberOfRequiredMemorySegments());
+			assertEquals(0, boundedPool.getNumberOfRequiredMemorySegments());
+			assertEquals(5, nonFixedPool.getNumberOfRequiredMemorySegments());
+
+			// actually, the buffer pool sizes may be different due to rounding and based on the internal order of
+			// the buffer pools - the total number of retrievable buffers should be equal to the number of buffers
+			// in the NetworkBufferPool though
+
+			ArrayList<Buffer> buffers = new ArrayList<>(globalPool.getTotalNumberOfMemorySegments());
+			collectBuffers:
+			for (int i = 0; i < 10; ++i) {
+				for (BufferPool bp : new BufferPool[] { fixedPool, boundedPool, nonFixedPool }) {
+					Buffer buffer = bp.requestBuffer();
+					if (buffer != null) {
+						assertNotNull(buffer.getMemorySegment());
+						buffers.add(buffer);
+						continue collectBuffers;
+					}
+				}
 			}
 
-			assertNull(lbp.requestBuffer());
+			assertEquals(globalPool.getTotalNumberOfMemorySegments(), buffers.size());
+
+			assertNull(fixedPool.requestBuffer());
+			assertNull(boundedPool.requestBuffer());
+			assertNull(nonFixedPool.requestBuffer());
 
 			// destroy all allocated ones
 			globalPool.destroyAllBufferPools();
 
 			// check the destroyed status
 			assertFalse(globalPool.isDestroyed());
-			assertTrue(lbp.isDestroyed());
+			assertTrue(fixedPool.isDestroyed());
+			assertTrue(boundedPool.isDestroyed());
+			assertTrue(nonFixedPool.isDestroyed());
 
 			assertEquals(0, globalPool.getNumberOfRegisteredBufferPools());
 
@@ -107,7 +137,23 @@ public class NetworkBufferPoolTest {
 
 			// can request no more buffers
 			try {
-				lbp.requestBuffer();
+				fixedPool.requestBuffer();
+				fail("Should fail with an IllegalStateException");
+			}
+			catch (IllegalStateException e) {
+				// yippie!
+			}
+
+			try {
+				boundedPool.requestBuffer();
+				fail("Should fail with an IllegalStateException");
+			}
+			catch (IllegalStateException e) {
+				// that's the way we like it, aha, aha
+			}
+
+			try {
+				nonFixedPool.requestBuffer();
 				fail("Should fail with an IllegalStateException");
 			}
 			catch (IllegalStateException e) {
@@ -115,7 +161,7 @@ public class NetworkBufferPoolTest {
 			}
 
 			// can create a new pool now
-			assertNotNull(globalPool.createBufferPool(8));
+			assertNotNull(globalPool.createBufferPool(10, Integer.MAX_VALUE));
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 27177c9..64f82f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -60,7 +60,7 @@ public class InputGateConcurrentTest {
 		final SingleInputGate gate = new SingleInputGate(
 				"Test Task Name",
 				new JobID(),
-				new IntermediateDataSetID(),
+				new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
 				0, numChannels,
 				mock(TaskActions.class),
 				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
@@ -95,7 +95,7 @@ public class InputGateConcurrentTest {
 		final SingleInputGate gate = new SingleInputGate(
 				"Test Task Name",
 				new JobID(),
-				new IntermediateDataSetID(),
+				new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
 				0,
 				numChannels,
 				mock(TaskActions.class),
@@ -144,7 +144,7 @@ public class InputGateConcurrentTest {
 		final SingleInputGate gate = new SingleInputGate(
 				"Test Task Name",
 				new JobID(),
-				new IntermediateDataSetID(),
+				new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
 				0,
 				numChannels,
 				mock(TaskActions.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index 7e1d792..f933840 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -347,7 +347,8 @@ public class InputGateFairnessTest {
 				TaskActions taskActions,
 				TaskIOMetricGroup metrics) {
 
-			super(owningTaskName, jobId, consumedResultId, consumedSubpartitionIndex,
+			super(owningTaskName, jobId, consumedResultId, ResultPartitionType.PIPELINED,
+				consumedSubpartitionIndex,
 					numberOfInputChannels, taskActions, metrics);
 
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 15ff2da..18c3038 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -128,7 +128,7 @@ public class LocalInputChannelTest {
 
 			// Create a buffer pool for this partition
 			partition.registerBufferPool(
-				networkBuffers.createBufferPool(producerBufferPoolSize));
+				networkBuffers.createBufferPool(producerBufferPoolSize, producerBufferPoolSize));
 
 			// Create the producer
 			partitionProducers[i] = new TestPartitionProducer(
@@ -162,7 +162,7 @@ public class LocalInputChannelTest {
 						i,
 						parallelism,
 						numberOfBuffersPerChannel,
-						networkBuffers.createBufferPool(parallelism),
+						networkBuffers.createBufferPool(parallelism, parallelism),
 						partitionManager,
 						new TaskEventDispatcher(),
 						partitionIds)));
@@ -284,6 +284,7 @@ public class LocalInputChannelTest {
 			"test task name",
 			new JobID(),
 			new IntermediateDataSetID(),
+			ResultPartitionType.PIPELINED,
 			0,
 			1,
 			mock(TaskActions.class),
@@ -481,6 +482,7 @@ public class LocalInputChannelTest {
 					"Test Name",
 					new JobID(),
 					new IntermediateDataSetID(),
+					ResultPartitionType.PIPELINED,
 					subpartitionIndex,
 					numberOfInputChannels,
 					mock(TaskActions.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index a25b8d5..2d1b4b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -73,7 +74,13 @@ public class SingleInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final SingleInputGate inputGate = new SingleInputGate(
-			"Test Task Name", new JobID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			"Test Task Name", new JobID(),
+			new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
+			0, 2,
+			mock(TaskActions.class),
+			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+
+		assertEquals(ResultPartitionType.PIPELINED, inputGate.getConsumedPartitionType());
 
 		final TestInputChannel[] inputChannels = new TestInputChannel[]{
 			new TestInputChannel(inputGate, 0),
@@ -127,7 +134,12 @@ public class SingleInputGateTest {
 		// Setup reader with one local and one unknown input channel
 		final IntermediateDataSetID resultId = new IntermediateDataSetID();
 
-		final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), resultId, 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+		final SingleInputGate inputGate = new SingleInputGate(
+				"Test Task Name", new JobID(),
+				resultId, ResultPartitionType.PIPELINED,
+				0, 2,
+				mock(TaskActions.class),
+				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 		final BufferPool bufferPool = mock(BufferPool.class);
 		when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 
@@ -179,6 +191,7 @@ public class SingleInputGateTest {
 			"t1",
 			new JobID(),
 			new IntermediateDataSetID(),
+			ResultPartitionType.PIPELINED,
 			0,
 			1,
 			mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
@@ -218,6 +231,7 @@ public class SingleInputGateTest {
 			"InputGate",
 			new JobID(),
 			new IntermediateDataSetID(),
+			ResultPartitionType.PIPELINED,
 			0,
 			1,
 			mock(TaskActions.class),
@@ -303,7 +317,9 @@ public class SingleInputGateTest {
 				partitionIds[2],
 				ResultPartitionLocation.createUnknown())};
 
-		InputGateDeploymentDescriptor gateDesc = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), 0, channelDescs);
+		InputGateDeploymentDescriptor gateDesc =
+			new InputGateDeploymentDescriptor(new IntermediateDataSetID(),
+				ResultPartitionType.PIPELINED, 0, channelDescs);
 
 		int initialBackoff = 137;
 		int maxBackoff = 1001;
@@ -324,6 +340,8 @@ public class SingleInputGateTest {
 			mock(TaskActions.class),
 			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
+		assertEquals(gateDesc.getConsumedPartitionType(), gate.getConsumedPartitionType());
+
 		Map<IntermediateResultPartitionID, InputChannel> channelMap = gate.getInputChannels();
 
 		assertEquals(3, channelMap.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index fe3b087..18ad490 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
@@ -55,6 +56,7 @@ public class TestSingleInputGate {
 			"Test Task Name",
 			new JobID(),
 			new IntermediateDataSetID(),
+			ResultPartitionType.PIPELINED,
 			0,
 			numberOfInputChannels,
 			mock(TaskActions.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index c05df0a..bc1dd07 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.taskmanager.TaskActions;
@@ -42,8 +43,18 @@ public class UnionInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final String testTaskName = "Test Task";
-		final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new IntermediateDataSetID(), 0, 3, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
-		final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new IntermediateDataSetID(), 0, 5, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+		final SingleInputGate ig1 = new SingleInputGate(
+			testTaskName, new JobID(),
+			new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
+			0, 3,
+			mock(TaskActions.class),
+			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+		final SingleInputGate ig2 = new SingleInputGate(
+			testTaskName, new JobID(),
+			new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
+			0, 5,
+			mock(TaskActions.class),
+			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index e26e176..cecfe6a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -113,7 +113,8 @@ public class TaskManagerComponentsStartupShutdownTest {
 				false); // exit-jvm-on-fatal-error
 
 			final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
-					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, null);
+					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC,
+					0, 0, 2, 8, null);
 
 			ResourceID taskManagerId = ResourceID.generate();
 			
@@ -130,7 +131,9 @@ public class TaskManagerComponentsStartupShutdownTest {
 				null,
 				netConf.ioMode(),
 				netConf.partitionRequestInitialBackoff(),
-				netConf.partitionRequestMaxBackoff());
+				netConf.partitionRequestMaxBackoff(),
+				netConf.networkBuffersPerChannel(),
+				netConf.extraNetworkBuffersPerGate());
 
 			network.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 356d693..0b39e20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -69,6 +69,7 @@ import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import org.apache.flink.runtime.messages.TaskMessages.StopTask;
 import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -91,6 +92,7 @@ import scala.concurrent.duration.FiniteDuration;
 import scala.util.Failure;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.ArrayList;
@@ -637,7 +639,7 @@ public class TaskManagerTest extends TestLogger {
 
 				InputGateDeploymentDescriptor ircdd =
 						new InputGateDeploymentDescriptor(
-								new IntermediateDataSetID(),
+								new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
 								0, new InputChannelDeploymentDescriptor[]{
 										new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())
 								}
@@ -782,7 +784,7 @@ public class TaskManagerTest extends TestLogger {
 
 				InputGateDeploymentDescriptor ircdd =
 						new InputGateDeploymentDescriptor(
-								new IntermediateDataSetID(),
+								new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
 								0, new InputChannelDeploymentDescriptor[]{
 										new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())
 								}
@@ -936,7 +938,7 @@ public class TaskManagerTest extends TestLogger {
 								new InputChannelDeploymentDescriptor(partitionId, loc)};
 
 				final InputGateDeploymentDescriptor igdd =
-						new InputGateDeploymentDescriptor(resultId, 0, icdd);
+						new InputGateDeploymentDescriptor(resultId, ResultPartitionType.PIPELINED, 0, icdd);
 
 				final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
 						jid, "TestJob", vid, eid,
@@ -978,6 +980,25 @@ public class TaskManagerTest extends TestLogger {
 		}};
 	}
 
+	@Test
+	public void testTaskManagerServicesConfiguration() throws Exception {
+
+		// set some non-default values
+		final Configuration config = new Configuration();
+		config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+		config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+		config.setInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL, 10);
+		config.setInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
+
+		TaskManagerServicesConfiguration tmConfig =
+			TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getByName("localhost"), true);
+
+		assertEquals(tmConfig.getNetworkConfig().partitionRequestInitialBackoff(), 100);
+		assertEquals(tmConfig.getNetworkConfig().partitionRequestMaxBackoff(), 200);
+		assertEquals(tmConfig.getNetworkConfig().networkBuffersPerChannel(), 10);
+		assertEquals(tmConfig.getNetworkConfig().extraNetworkBuffersPerGate(), 100);
+	}
+
 	/**
 	 *  Tests that repeated local {@link PartitionNotFoundException}s ultimately fail the receiver.
 	 */
@@ -1031,7 +1052,7 @@ public class TaskManagerTest extends TestLogger {
 								new InputChannelDeploymentDescriptor(partitionId, loc)};
 
 				final InputGateDeploymentDescriptor igdd =
-						new InputGateDeploymentDescriptor(resultId, 0, icdd);
+						new InputGateDeploymentDescriptor(resultId, ResultPartitionType.PIPELINED, 0, icdd);
 
 				final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
 						jid, "TestJob", vid, eid,

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 003eff9..c18b527 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -402,17 +402,17 @@ public class StreamingJobGraphGenerator {
 			jobEdge = downStreamVertex.connectNewDataSetAsInput(
 				headVertex,
 				DistributionPattern.POINTWISE,
-				ResultPartitionType.PIPELINED);
+				ResultPartitionType.PIPELINED_BOUNDED);
 		} else if (partitioner instanceof RescalePartitioner){
 			jobEdge = downStreamVertex.connectNewDataSetAsInput(
 				headVertex,
 				DistributionPattern.POINTWISE,
-				ResultPartitionType.PIPELINED);
+				ResultPartitionType.PIPELINED_BOUNDED);
 		} else {
 			jobEdge = downStreamVertex.connectNewDataSetAsInput(
 					headVertex,
 					DistributionPattern.ALL_TO_ALL,
-					ResultPartitionType.PIPELINED);
+					ResultPartitionType.PIPELINED_BOUNDED);
 		}
 		// set strategy name so that web interface can show it.
 		jobEdge.setShipStrategyName(partitioner.toString());