You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/10 13:42:43 UTC

[1/7] flink git commit: [FLINK-6010] [docs] Correct IntelliJ IDEA Plugins path in 'Installing the Scala plugin' section

Repository: flink
Updated Branches:
  refs/heads/master 7456d78d2 -> 206ea2119


[FLINK-6010] [docs] Correct IntelliJ IDEA Plugins path in 'Installing the Scala plugin' section

This closes #3504


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a87b5270
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a87b5270
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a87b5270

Branch: refs/heads/master
Commit: a87b5270fb62043fd2a3d049ec525889cee04070
Parents: 7456d78
Author: Bowen Li <bo...@gmail.com>
Authored: Thu Mar 9 13:55:07 2017 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 10 10:50:04 2017 +0100

----------------------------------------------------------------------
 docs/internals/ide_setup.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a87b5270/docs/internals/ide_setup.md
----------------------------------------------------------------------
diff --git a/docs/internals/ide_setup.md b/docs/internals/ide_setup.md
index f170588..329f4cd 100644
--- a/docs/internals/ide_setup.md
+++ b/docs/internals/ide_setup.md
@@ -59,7 +59,7 @@ The IntelliJ installation setup offers to install the Scala plugin.
 If it is not installed, follow these instructions before importing Flink
 to enable support for Scala projects and files:
 
-1. Go to IntelliJ plugins settings (File -> Settings -> Plugins) and
+1. Go to IntelliJ plugins settings (IntelliJ IDEA -> Preferences -> Plugins) and
    click on "Install Jetbrains plugin...".
 2. Select and install the "Scala" plugin.
 3. Restart IntelliJ


[4/7] flink git commit: [FLINK-5980] [core] Expose max-parallelism value in RuntimeContext.

Posted by se...@apache.org.
[FLINK-5980] [core] Expose max-parallelism value in RuntimeContext.

This closes #3487


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f47e31b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f47e31b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f47e31b

Branch: refs/heads/master
Commit: 6f47e31babe159ec28c8619263d8ea874e56f8fc
Parents: bb1197d
Author: biao.liub <bi...@alibaba-inc.com>
Authored: Tue Mar 7 20:11:10 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 10 13:30:57 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/api/common/TaskInfo.java  | 16 ++++++++--------
 .../flink/api/common/functions/RuntimeContext.java  |  8 ++++++++
 .../functions/util/AbstractRuntimeUDFContext.java   |  7 ++++++-
 .../optimizer/plantranslate/JobGraphGenerator.java  |  3 +++
 .../runtime/executiongraph/ExecutionJobVertex.java  |  9 +++++++--
 .../runtime/executiongraph/TaskInformation.java     | 10 +++++-----
 .../org/apache/flink/runtime/taskmanager/Task.java  |  2 +-
 .../api/functions/async/RichAsyncFunction.java      |  5 +++++
 .../api/operators/AbstractStreamOperator.java       |  5 +++--
 .../util/AbstractStreamOperatorTestHarness.java     |  2 +-
 .../KeyedOneInputStreamOperatorTestHarness.java     |  2 +-
 11 files changed, 48 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
index 5627ca8..33f2e0c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
@@ -31,20 +31,20 @@ public class TaskInfo {
 
 	private final String taskName;
 	private final String taskNameWithSubtasks;
-	private final int numberOfKeyGroups;
+	private final int maxNumberOfParallelSubtasks;
 	private final int indexOfSubtask;
 	private final int numberOfParallelSubtasks;
 	private final int attemptNumber;
 
-	public TaskInfo(String taskName, int numberOfKeyGroups, int indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) {
+	public TaskInfo(String taskName, int maxNumberOfParallelSubtasks, int indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) {
 		checkArgument(indexOfSubtask >= 0, "Task index must be a non-negative number.");
-		checkArgument(numberOfKeyGroups >= 1, "Max parallelism must be a positive number.");
-		checkArgument(numberOfKeyGroups >= numberOfParallelSubtasks, "Max parallelism must be >= than parallelism.");
+		checkArgument(maxNumberOfParallelSubtasks >= 1, "Max parallelism must be a positive number.");
+		checkArgument(maxNumberOfParallelSubtasks >= numberOfParallelSubtasks, "Max parallelism must be >= than parallelism.");
 		checkArgument(numberOfParallelSubtasks >= 1, "Parallelism must be a positive number.");
 		checkArgument(indexOfSubtask < numberOfParallelSubtasks, "Task index must be less than parallelism.");
 		checkArgument(attemptNumber >= 0, "Attempt number must be a non-negative number.");
 		this.taskName = checkNotNull(taskName, "Task Name must not be null.");
-		this.numberOfKeyGroups = numberOfKeyGroups;
+		this.maxNumberOfParallelSubtasks = maxNumberOfParallelSubtasks;
 		this.indexOfSubtask = indexOfSubtask;
 		this.numberOfParallelSubtasks = numberOfParallelSubtasks;
 		this.attemptNumber = attemptNumber;
@@ -61,10 +61,10 @@ public class TaskInfo {
 	}
 
 	/**
-	 * Gets the number of key groups aka the max parallelism aka the max number of subtasks.
+	 * Gets the max parallelism aka the max number of subtasks.
 	 */
-	public int getNumberOfKeyGroups() {
-		return numberOfKeyGroups;
+	public int getMaxNumberOfParallelSubtasks() {
+		return maxNumberOfParallelSubtasks;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 98ad018..2978f3a 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -77,6 +77,14 @@ public interface RuntimeContext {
 	int getNumberOfParallelSubtasks();
 
 	/**
+	 * Gets the number of max-parallelism with which the parallel task runs.
+	 *
+	 * @return The max-parallelism with which the parallel task runs.
+	 */
+	@PublicEvolving
+	int getMaxNumberOfParallelSubtasks();
+
+	/**
 	 * Gets the number of this parallel subtask. The numbering starts from 0 and goes up to
 	 * parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}).
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 2538799..bcd0ad0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -97,10 +97,15 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 	}
 
 	@Override
+	public int getMaxNumberOfParallelSubtasks() {
+		return taskInfo.getMaxNumberOfParallelSubtasks();
+	}
+
+	@Override
 	public int getIndexOfThisSubtask() {
 		return taskInfo.getIndexOfThisSubtask();
 	}
-	
+
 	@Override
 	public MetricGroup getMetricGroup() {
 		return metrics;

http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 61e5327..bbc944e 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -394,6 +394,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			// set parallelism
 			int pd = node.getParallelism();
 			vertex.setParallelism(pd);
+			vertex.setMaxParallelism(pd);
 			
 			vertex.setSlotSharingGroup(sharingGroup);
 			
@@ -1276,6 +1277,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		final JobVertex sync = new JobVertex("Sync(" + bulkNode.getNodeName() + ")");
 		sync.setInvokableClass(IterationSynchronizationSinkTask.class);
 		sync.setParallelism(1);
+		sync.setMaxParallelism(1);
 		this.auxVertices.add(sync);
 		
 		final TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
@@ -1412,6 +1414,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			final JobVertex sync = new JobVertex("Sync (" + iterNode.getNodeName() + ")");
 			sync.setInvokableClass(IterationSynchronizationSinkTask.class);
 			sync.setParallelism(1);
+			sync.setMaxParallelism(1);
 			this.auxVertices.add(sync);
 			
 			syncConfig = new TaskConfig(sync.getConfiguration());

http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 754148e..852d530 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.Archiveable;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
@@ -223,10 +224,14 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	}
 
 	private void setMaxParallelismInternal(int maxParallelism) {
+		if (maxParallelism == ExecutionConfig.PARALLELISM_AUTO_MAX) {
+			maxParallelism = KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
+		}
+
 		Preconditions.checkArgument(maxParallelism > 0
 						&& maxParallelism <= KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
-				"Overriding max parallelism is not in valid bounds (1.." +
-						KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM + "), found:" + maxParallelism);
+				"Overriding max parallelism is not in valid bounds (1..%s), found: %s",
+				KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM, maxParallelism);
 
 		this.maxParallelism = maxParallelism;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
index ccec118..7fb68e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
@@ -42,7 +42,7 @@ public class TaskInformation implements Serializable {
 	private final int numberOfSubtasks;
 
 	/** The maximum parallelism == number of key groups */
-	private final int numberOfKeyGroups;
+	private final int maxNumberOfSubtaks;
 
 	/** Class name of the invokable to run */
 	private final String invokableClassName;
@@ -54,13 +54,13 @@ public class TaskInformation implements Serializable {
 			JobVertexID jobVertexId,
 			String taskName,
 			int numberOfSubtasks,
-			int numberOfKeyGroups,
+			int maxNumberOfSubtaks,
 			String invokableClassName,
 			Configuration taskConfiguration) {
 		this.jobVertexId = Preconditions.checkNotNull(jobVertexId);
 		this.taskName = Preconditions.checkNotNull(taskName);
 		this.numberOfSubtasks = Preconditions.checkNotNull(numberOfSubtasks);
-		this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups);
+		this.maxNumberOfSubtaks = Preconditions.checkNotNull(maxNumberOfSubtaks);
 		this.invokableClassName = Preconditions.checkNotNull(invokableClassName);
 		this.taskConfiguration = Preconditions.checkNotNull(taskConfiguration);
 	}
@@ -77,8 +77,8 @@ public class TaskInformation implements Serializable {
 		return numberOfSubtasks;
 	}
 
-	public int getNumberOfKeyGroups() {
-		return numberOfKeyGroups;
+	public int getMaxNumberOfSubtaks() {
+		return maxNumberOfSubtaks;
 	}
 
 	public String getInvokableClassName() {

http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 8732c60..b0f0eb8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -291,7 +291,7 @@ public class Task implements Runnable, TaskActions {
 
 		this.taskInfo = new TaskInfo(
 				taskInformation.getTaskName(),
-				taskInformation.getNumberOfKeyGroups(),
+				taskInformation.getMaxNumberOfSubtaks(),
 				subtaskIndex,
 				taskInformation.getNumberOfSubtasks(),
 				attemptNumber);

http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
index 7971460..8c788b4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java
@@ -120,6 +120,11 @@ public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction im
 		}
 
 		@Override
+		public int getMaxNumberOfParallelSubtasks() {
+			return runtimeContext.getMaxNumberOfParallelSubtasks();
+		}
+
+		@Override
 		public int getIndexOfThisSubtask() {
 			return runtimeContext.getIndexOfThisSubtask();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a81056f..6e6b147 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -283,13 +283,14 @@ public abstract class AbstractStreamOperator<OUT>
 			// create a keyed state backend if there is keyed state, as indicated by the presence of a key serializer
 			if (null != keySerializer) {
 				KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
-						container.getEnvironment().getTaskInfo().getNumberOfKeyGroups(),
+						container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),
 						container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),
 						container.getEnvironment().getTaskInfo().getIndexOfThisSubtask());
 
 				this.keyedStateBackend = container.createKeyedStateBackend(
 						keySerializer,
-						container.getEnvironment().getTaskInfo().getNumberOfKeyGroups(),
+						// The maximum parallelism == number of key group
+						container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),
 						subTaskKeyGroupRange);
 
 				this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());

http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 07424f7..c984eed 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -339,7 +339,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 		}
 
 		if (operatorStateHandles != null) {
-			int numKeyGroups = getEnvironment().getTaskInfo().getNumberOfKeyGroups();
+			int numKeyGroups = getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks();
 			int numSubtasks = getEnvironment().getTaskInfo().getNumberOfParallelSubtasks();
 			int subtaskIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6f47e31b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index effb44c..d45ae21 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -211,7 +211,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 	@Override
 	public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception {
 		if (operatorStateHandles != null) {
-			int numKeyGroups = getEnvironment().getTaskInfo().getNumberOfKeyGroups();
+			int numKeyGroups = getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks();
 			int numSubtasks = getEnvironment().getTaskInfo().getNumberOfParallelSubtasks();
 			int subtaskIndex = getEnvironment().getTaskInfo().getIndexOfThisSubtask();
 


[3/7] flink git commit: [FLINK-5976] [tests] Deduplicate Tokenizer in tests

Posted by se...@apache.org.
[FLINK-5976] [tests] Deduplicate Tokenizer in tests

This closes #3485


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5e6da0b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5e6da0b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5e6da0b

Branch: refs/heads/master
Commit: e5e6da0b38495d4030fd0b790a81ca70f9ce84b2
Parents: 6f47e31
Author: liuyuzhong7 <li...@gmail.com>
Authored: Tue Mar 7 19:15:40 2017 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 10 13:30:57 2017 +0100

----------------------------------------------------------------------
 .../clients/examples/LocalExecutorITCase.java   | 29 ++++------------
 .../hadoop/mapred/WordCountMapredITCase.java    | 19 +----------
 .../mapreduce/WordCountMapreduceITCase.java     | 19 +----------
 .../api/outputformat/CsvOutputFormatITCase.java | 21 +-----------
 .../outputformat/TextOutputFormatITCase.java    |  3 +-
 .../flink/test/testfunctions/Tokenizer.java     | 36 ++++++++++++++++++++
 6 files changed, 48 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e5e6da0b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
index 4ed28a8..95d1ab2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
@@ -20,15 +20,12 @@ package org.apache.flink.test.clients.examples;
 
 import java.io.File;
 import java.io.FileWriter;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.LocalExecutor;
 import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.Collector;
+import org.apache.flink.test.testfunctions.Tokenizer;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -45,7 +42,7 @@ public class LocalExecutorITCase {
 			File outFile = File.createTempFile("wctext", ".out");
 			inFile.deleteOnExit();
 			outFile.deleteOnExit();
-			
+
 			try (FileWriter fw = new FileWriter(inFile)) {
 				fw.write(WordCountData.TEXT);
 			}
@@ -64,27 +61,15 @@ public class LocalExecutorITCase {
 			Assert.fail(e.getMessage());
 		}
 	}
-	
+
 	private Plan getWordCountPlan(File inFile, File outFile, int parallelism) {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(parallelism);
 		env.readTextFile(inFile.getAbsolutePath())
-				.flatMap(new Tokenizer())
-				.groupBy(0)
-				.sum(1)
-				.writeAsCsv(outFile.getAbsolutePath());
+			.flatMap(new Tokenizer())
+			.groupBy(0)
+			.sum(1)
+			.writeAsCsv(outFile.getAbsolutePath());
 		return env.createProgramPlan();
 	}
-
-	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-		@Override
-		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
-			String[] tokens = value.toLowerCase().split("\\W+");
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Tuple2<>(token, 1));
-				}
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e5e6da0b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
index 9528d94..7c1b30e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.hadoop.mapred;
 
-import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -27,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.util.Collector;
+import org.apache.flink.test.testfunctions.Tokenizer;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
@@ -115,20 +114,4 @@ public class WordCountMapredITCase extends JavaProgramTestBase {
 		words.output(hadoopOutputFormat);
 		env.execute("Hadoop Compat WordCount");
 	}
-
-	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-
-		@Override
-		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
-			// normalize and split the line
-			String[] tokens = value.toLowerCase().split("\\W+");
-
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
-				}
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e5e6da0b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
index 64062d2..fbc1994 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.hadoop.mapreduce;
 
-import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -27,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import static org.apache.flink.hadoopcompatibility.HadoopInputs.readHadoopFile;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.util.Collector;
+import org.apache.flink.test.testfunctions.Tokenizer;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
@@ -115,20 +114,4 @@ public class WordCountMapreduceITCase extends JavaProgramTestBase {
 		words.output(hadoopOutputFormat);
 		env.execute("Hadoop Compat WordCount");
 	}
-
-	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-
-		@Override
-		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
-			// normalize and split the line
-			String[] tokens = value.toLowerCase().split("\\W+");
-
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
-				}
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e5e6da0b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
index c2155ac..61dddb8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/CsvOutputFormatITCase.java
@@ -17,13 +17,12 @@
 
 package org.apache.flink.test.streaming.api.outputformat;
 
-import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.Collector;
+import org.apache.flink.test.testfunctions.Tokenizer;
 
 public class CsvOutputFormatITCase extends StreamingProgramTestBase {
 
@@ -56,23 +55,5 @@ public class CsvOutputFormatITCase extends StreamingProgramTestBase {
 				.replaceAll("[\\\\(\\\\)]", ""), resultPath);
 	}
 
-	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
-				throws Exception {
-			// normalize and split the line
-			String[] tokens = value.toLowerCase().split("\\W+");
-
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
-				}
-			}
-		}
-	}
-
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e5e6da0b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
index 2940e6d..6ea864e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/outputformat/TextOutputFormatITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.testfunctions.Tokenizer;
 
 public class TextOutputFormatITCase extends StreamingProgramTestBase {
 
@@ -39,7 +40,7 @@ public class TextOutputFormatITCase extends StreamingProgramTestBase {
 		DataStream<String> text = env.fromElements(WordCountData.TEXT);
 
 		DataStream<Tuple2<String, Integer>> counts = text
-				.flatMap(new CsvOutputFormatITCase.Tokenizer())
+				.flatMap(new Tokenizer())
 				.keyBy(0).sum(1);
 
 		counts.writeAsText(resultPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/e5e6da0b/flink-tests/src/test/java/org/apache/flink/test/testfunctions/Tokenizer.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/testfunctions/Tokenizer.java b/flink-tests/src/test/java/org/apache/flink/test/testfunctions/Tokenizer.java
new file mode 100644
index 0000000..9b3764d
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/testfunctions/Tokenizer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.testfunctions;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+public final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+
+	@Override
+	public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+		String[] tokens = value.toLowerCase().split("\\W+");
+		for (String token : tokens) {
+			if (token.length() > 0) {
+				out.collect(new Tuple2<>(token, 1));
+			}
+		}
+	}
+}


[2/7] flink git commit: [FLINK-6005] [misc] Fix some ArrayList initializations without initial size

Posted by se...@apache.org.
[FLINK-6005] [misc] Fix some ArrayList initializations without initial size

This closes #3499


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb1197d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb1197d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb1197d9

Branch: refs/heads/master
Commit: bb1197d9162a630c87d47defc5d0c42de6726feb
Parents: a87b527
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Thu Mar 9 12:24:33 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 10 10:58:07 2017 +0100

----------------------------------------------------------------------
 .../api/common/typeutils/base/ListSerializerTest.java     |  5 +++--
 .../apache/flink/graph/library/SummarizationITCase.java   |  5 +++--
 .../java/org/apache/flink/graph/generator/TestUtils.java  | 10 ++++++----
 .../io/network/serialization/LargeRecordsTest.java        |  8 ++++----
 4 files changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bb1197d9/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
index 28cdc13..2de6bec 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ListSerializerTest.java
@@ -71,8 +71,9 @@ public class ListSerializerTest extends SerializerTestBase<List<Long>> {
 			list7.add(rnd.nextLong());
 		}
 
-		final List<Long> list8 = new ArrayList<>();
-		for (int i = 0; i < rnd.nextInt(200); i++) {
+		int list8Len = rnd.nextInt(200);
+		final List<Long> list8 = new ArrayList<>(list8Len);
+		for (int i = 0; i < list8Len; i++) {
 			list8.add(rnd.nextLong());
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb1197d9/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
index 43514dc..fe4cd24 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
@@ -185,8 +185,9 @@ public class SummarizationITCase extends MultipleProgramsTestBase {
 	}
 
 	private List<Long> getListFromIdRange(String idRange) {
-		List<Long> result = new ArrayList<>();
-		for (String id : ID_SEPARATOR.split(idRange)) {
+		String[] split = ID_SEPARATOR.split(idRange);
+		List<Long> result = new ArrayList<>(split.length);
+		for (String id : split) {
 			result.add(Long.parseLong(id));
 		}
 		return result;

http://git-wip-us.apache.org/repos/asf/flink/blob/bb1197d9/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
index 2cf9eac..23ad31c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
@@ -57,9 +57,10 @@ public final class TestUtils {
 	private static <K, VV, EV> void compareVertices(Graph<K, VV, EV> graph, String expectedVertices)
 			throws Exception {
 		if (expectedVertices != null) {
-			List<String> resultVertices = new ArrayList<>();
+			List<Vertex<K, VV>> vertices = graph.getVertices().collect();
+			List<String> resultVertices = new ArrayList<>(vertices.size());
 
-			for (Vertex<K, VV> vertex : graph.getVertices().collect()) {
+			for (Vertex<K, VV> vertex : vertices) {
 				resultVertices.add(vertex.f0.toString());
 			}
 
@@ -70,9 +71,10 @@ public final class TestUtils {
 	private static <K, VV, EV> void compareEdges(Graph<K, VV, EV> graph, String expectedEdges)
 			throws Exception {
 		if (expectedEdges != null) {
-			List<String> resultEdges = new ArrayList<>();
+			List<Edge<K, EV>> edges = graph.getEdges().collect();
+			List<String> resultEdges = new ArrayList<>(edges.size());
 
-			for (Edge<K, EV> edge : graph.getEdges().collect()) {
+			for (Edge<K, EV> edge : edges) {
 				resultEdges.add(edge.f0.toString() + "," + edge.f1.toString());
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb1197d9/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
index 1574fe9..25523db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
@@ -54,8 +54,8 @@ public class LargeRecordsTest {
 
 			final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
 
-			List<SerializationTestType> originalRecords = new ArrayList<SerializationTestType>();
-			List<SerializationTestType> deserializedRecords = new ArrayList<SerializationTestType>();
+			List<SerializationTestType> originalRecords = new ArrayList<SerializationTestType>((NUM_RECORDS + 1) / 2);
+			List<SerializationTestType> deserializedRecords = new ArrayList<SerializationTestType>((NUM_RECORDS + 1) / 2);
 			
 			LargeObjectType genLarge = new LargeObjectType();
 			
@@ -154,8 +154,8 @@ public class LargeRecordsTest {
 
 			final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
 
-			List<SerializationTestType> originalRecords = new ArrayList<SerializationTestType>();
-			List<SerializationTestType> deserializedRecords = new ArrayList<SerializationTestType>();
+			List<SerializationTestType> originalRecords = new ArrayList<>((NUM_RECORDS + 1) / 2);
+			List<SerializationTestType> deserializedRecords = new ArrayList<>((NUM_RECORDS + 1) / 2);
 			
 			LargeObjectType genLarge = new LargeObjectType();
 			


[7/7] flink git commit: [FLINK-4545] [network] Small adjustments to LocalBufferPool with limited the number of used buffers

Posted by se...@apache.org.
[FLINK-4545] [network] Small adjustments to LocalBufferPool with limited the number of used buffers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/206ea211
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/206ea211
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/206ea211

Branch: refs/heads/master
Commit: 206ea2119ce80c7d4920a471eb58c1d30abbd995
Parents: 11e2aa6
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Mar 10 12:25:17 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 10 13:30:58 2017 +0100

----------------------------------------------------------------------
 .../io/network/buffer/LocalBufferPool.java      |  7 ++---
 .../io/network/buffer/NetworkBufferPool.java    | 27 +++++++++++++-------
 .../runtime/taskmanager/TaskManagerTest.java    |  2 +-
 3 files changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/206ea211/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index a587997..b485fd1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -58,7 +58,7 @@ class LocalBufferPool implements BufferPool {
 	 * The currently available memory segments. These are segments, which have been requested from
 	 * the network buffer pool and are currently not handed out as Buffer instances.
 	 */
-	private final Queue<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
+	private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
 
 	/**
 	 * Buffer availability listeners, which need to be notified when a Buffer becomes available.
@@ -297,8 +297,9 @@ class LocalBufferPool implements BufferPool {
 	@Override
 	public void setNumBuffers(int numBuffers) throws IOException {
 		synchronized (availableMemorySegments) {
-			checkArgument(numBuffers >= numberOfRequiredMemorySegments, "Buffer pool needs at least " +
-				numberOfRequiredMemorySegments + " buffers, but tried to set to " + numBuffers + ".");
+			checkArgument(numBuffers >= numberOfRequiredMemorySegments,
+					"Buffer pool needs at least %s buffers, but tried to set to %s",
+					numberOfRequiredMemorySegments, numBuffers);
 
 			if (numBuffers > maxNumberOfMemorySegments) {
 				currentPoolSize = maxNumberOfMemorySegments;

http://git-wip-us.apache.org/repos/asf/flink/blob/206ea211/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 7668759..5f2da03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -22,13 +22,13 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 
@@ -50,7 +50,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 	private final int memorySegmentSize;
 
-	private final Queue<MemorySegment> availableMemorySegments;
+	private final ArrayBlockingQueue<MemorySegment> availableMemorySegments;
 
 	private volatile boolean isDestroyed;
 
@@ -124,9 +124,10 @@ public class NetworkBufferPool implements BufferPoolFactory {
 		return availableMemorySegments.poll();
 	}
 
-	// This is not safe with regard to destroy calls, but it does not hurt, because destroy happens
-	// only once at clean up time (task manager shutdown).
 	public void recycle(MemorySegment segment) {
+		// Adds the segment back to the queue, which does not immediately free the memory
+		// however, since this happens when references to the global pool are also released,
+		// making the availableMemorySegments queue and its contained object reclaimable
 		availableMemorySegments.add(segment);
 	}
 
@@ -260,7 +261,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 		assert Thread.holdsLock(factoryLock);
 
 		// All buffers, which are not among the required ones
-		int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
+		final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
 
 		if (numAvailableMemorySegment == 0) {
 			// in this case, we need to redistribute buffers so that every pool gets its minimum
@@ -278,7 +279,8 @@ public class NetworkBufferPool implements BufferPoolFactory {
 		 * a ratio that we use to distribute the buffers.
 		 */
 
-		int totalCapacity = 0;
+		long totalCapacity = 0; // long to avoid int overflow
+
 		for (LocalBufferPool bufferPool : allBufferPools) {
 			int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
 				bufferPool.getNumberOfRequiredMemorySegments();
@@ -290,9 +292,13 @@ public class NetworkBufferPool implements BufferPoolFactory {
 			return; // necessary to avoid div by zero when nothing to re-distribute
 		}
 
-		int memorySegmentsToDistribute = Math.min(numAvailableMemorySegment, totalCapacity);
+		// since one of the arguments of 'min(a,b)' is a positive int, this is actually
+		// guaranteed to be within the 'int' domain
+		// (we use a checked downCast to handle possible bugs more gracefully).
+		final int memorySegmentsToDistribute = MathUtils.checkedDownCast(
+				Math.min(numAvailableMemorySegment, totalCapacity));
 
-		int totalPartsUsed = 0; // of totalCapacity
+		long totalPartsUsed = 0; // of totalCapacity
 		int numDistributedMemorySegment = 0;
 		for (LocalBufferPool bufferPool : allBufferPools) {
 			int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
@@ -307,7 +313,10 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 			// avoid remaining buffers by looking at the total capacity that should have been
 			// re-distributed up until here
-			int mySize = memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment;
+			// the downcast will always succeed, because both arguments of the subtraction are in the 'int' domain
+			final int mySize = MathUtils.checkedDownCast(
+					memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);
+
 			numDistributedMemorySegment += mySize;
 			bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/206ea211/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 0b39e20..730595c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -991,7 +991,7 @@ public class TaskManagerTest extends TestLogger {
 		config.setInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
 
 		TaskManagerServicesConfiguration tmConfig =
-			TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getByName("localhost"), true);
+			TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getLoopbackAddress(), true);
 
 		assertEquals(tmConfig.getNetworkConfig().partitionRequestInitialBackoff(), 100);
 		assertEquals(tmConfig.getNetworkConfig().partitionRequestMaxBackoff(), 200);


[5/7] flink git commit: [FLINK-4545] [network] Allow LocalBufferPool to limited the number of used buffers

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 4d462d0..7c51bc2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.graph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
@@ -32,6 +33,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
@@ -152,10 +154,17 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 		StreamGraph streamGraph = env.getStreamGraph();
 		streamGraph.setJobName("test job");
 		JobGraph jobGraph = streamGraph.getJobGraph();
+		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
 		
 		assertEquals(2, jobGraph.getNumberOfVertices());
-		assertEquals(1, jobGraph.getVerticesAsArray()[0].getParallelism());
-		assertEquals(1, jobGraph.getVerticesAsArray()[1].getParallelism());
+		assertEquals(1, verticesSorted.get(0).getParallelism());
+		assertEquals(1, verticesSorted.get(1).getParallelism());
+
+		JobVertex sourceVertex = verticesSorted.get(0);
+		JobVertex mapSinkVertex = verticesSorted.get(1);
+
+		assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType());
+		assertEquals(ResultPartitionType.PIPELINED_BOUNDED, mapSinkVertex.getInputs().get(0).getSource().getResultType());
 	}
 
 	/**
@@ -191,8 +200,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
 			.print();
 		JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
 
-		JobVertex sourceVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
-		JobVertex mapPrintVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
+		List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
+		JobVertex sourceVertex = verticesSorted.get(0);
+		JobVertex mapPrintVertex = verticesSorted.get(1);
+
+		assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType());
+		assertEquals(ResultPartitionType.PIPELINED_BOUNDED, mapPrintVertex.getInputs().get(0).getSource().getResultType());
 
 		StreamConfig sourceConfig = new StreamConfig(sourceVertex.getConfiguration());
 		StreamConfig mapConfig = new StreamConfig(mapPrintVertex.getConfiguration());

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index ec55f19..4344a46 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -51,8 +51,8 @@ public class BarrierBufferMassiveRandomTest {
 		try {
 			ioMan = new IOManagerAsync();
 			
-			BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100);
-			BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100);
+			BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, 100);
+			BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, 100);
 
 			RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(
 					new BufferPool[] { pool1, pool2 },


[6/7] flink git commit: [FLINK-4545] [network] Allow LocalBufferPool to limited the number of used buffers

Posted by se...@apache.org.
[FLINK-4545] [network] Allow LocalBufferPool to limited the number of used buffers

Use "a * <number of channels> + b" buffers for bounded pipelined partitions:

Default: a = 2, b = 8
* 1 buffer for in-flight data in the subpartition/input channel
* 1 buffer for parallel serialization
* + some extra buffers (8 seems a good default given bandwidth-delay products of current networks)

This closes #3480


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11e2aa6d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11e2aa6d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11e2aa6d

Branch: refs/heads/master
Commit: 11e2aa6dcdbf42992cda57a5b50d5c29b4facf2d
Parents: e5e6da0
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Feb 10 14:53:09 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 10 13:30:58 2017 +0100

----------------------------------------------------------------------
 .../flink/configuration/TaskManagerOptions.java |  14 ++
 .../BackPressureStatsTrackerITCase.java         |   2 +-
 .../InputGateDeploymentDescriptor.java          |  15 ++
 .../runtime/executiongraph/ExecutionVertex.java |   7 +-
 .../runtime/io/network/NetworkEnvironment.java  |  23 ++-
 .../runtime/io/network/buffer/BufferPool.java   |  10 +-
 .../io/network/buffer/BufferPoolFactory.java    |   7 +-
 .../io/network/buffer/LocalBufferPool.java      |  57 ++++++-
 .../io/network/buffer/NetworkBufferPool.java    |  64 +++++--
 .../netty/PartitionRequestServerHandler.java    |   2 +-
 .../io/network/partition/ResultPartition.java   |  10 ++
 .../network/partition/ResultPartitionType.java  |  32 +++-
 .../partition/consumer/SingleInputGate.java     |  18 +-
 .../taskexecutor/TaskManagerServices.java       |   4 +-
 .../TaskManagerServicesConfiguration.java       |  17 +-
 .../NetworkEnvironmentConfiguration.scala       |   2 +
 .../io/network/MockNetworkEnvironment.java      |   2 +-
 .../io/network/NetworkEnvironmentTest.java      | 165 ++++++++++++++++++
 .../io/network/api/writer/RecordWriterTest.java |   2 +-
 .../network/buffer/BufferPoolFactoryTest.java   | 170 ++++++++++++++++---
 .../io/network/buffer/LocalBufferPoolTest.java  |  58 +++++++
 .../network/buffer/NetworkBufferPoolTest.java   |  96 ++++++++---
 .../partition/InputGateConcurrentTest.java      |   6 +-
 .../partition/InputGateFairnessTest.java        |   3 +-
 .../consumer/LocalInputChannelTest.java         |   6 +-
 .../partition/consumer/SingleInputGateTest.java |  24 ++-
 .../partition/consumer/TestSingleInputGate.java |   2 +
 .../partition/consumer/UnionInputGateTest.java  |  15 +-
 ...askManagerComponentsStartupShutdownTest.java |   7 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  29 +++-
 .../api/graph/StreamingJobGraphGenerator.java   |   6 +-
 .../graph/StreamingJobGraphGeneratorTest.java   |  21 ++-
 .../io/BarrierBufferMassiveRandomTest.java      |   4 +-
 33 files changed, 790 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index b7ee20a..b891e35 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -53,6 +53,20 @@ public class TaskManagerOptions {
 			key("taskmanager.net.request-backoff.max")
 			.defaultValue(10000);
 
+	/**
+	 * Number of network buffers to use for each outgoing/ingoing channel (subpartition/input channel).
+	 *
+	 * Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization
+	 */
+	public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
+		key("taskmanager.net.memory.buffers-per-channel")
+			.defaultValue(2);
+
+	/** Number of extra network buffers to use for each outgoing/ingoing gate (result partition/input gate). */
+	public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
+		key("taskmanager.net.memory.extra-buffers-per-gate")
+			.defaultValue(8);
+
 	// ------------------------------------------------------------------------
 	//  Task Options
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
index 1943129..46f8be6 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -108,7 +108,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 			//
 			// 1) Consume all buffers at first (no buffers for the test task)
 			//
-			testBufferPool = networkBufferPool.createBufferPool(1);
+			testBufferPool = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
 			final List<Buffer> buffers = new ArrayList<>();
 			while (true) {
 				Buffer buffer = testBufferPool.requestBuffer();

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
index dde1ed7..9bf724a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.deployment;
 
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -46,6 +47,9 @@ public class InputGateDeploymentDescriptor implements Serializable {
 	 */
 	private final IntermediateDataSetID consumedResultId;
 
+	/** The type of the partition the input gate is going to consume. */
+	private final ResultPartitionType consumedPartitionType;
+
 	/**
 	 * The index of the consumed subpartition of each consumed partition. This index depends on the
 	 * {@link DistributionPattern} and the subtask indices of the producing and consuming task.
@@ -57,10 +61,12 @@ public class InputGateDeploymentDescriptor implements Serializable {
 
 	public InputGateDeploymentDescriptor(
 			IntermediateDataSetID consumedResultId,
+			ResultPartitionType consumedPartitionType,
 			int consumedSubpartitionIndex,
 			InputChannelDeploymentDescriptor[] inputChannels) {
 
 		this.consumedResultId = checkNotNull(consumedResultId);
+		this.consumedPartitionType = checkNotNull(consumedPartitionType);
 
 		checkArgument(consumedSubpartitionIndex >= 0);
 		this.consumedSubpartitionIndex = consumedSubpartitionIndex;
@@ -72,6 +78,15 @@ public class InputGateDeploymentDescriptor implements Serializable {
 		return consumedResultId;
 	}
 
+	/**
+	 * Returns the type of this input channel's consumed result partition.
+	 *
+	 * @return consumed result partition type
+	 */
+	public ResultPartitionType getConsumedPartitionType() {
+		return consumedPartitionType;
+	}
+
 	public int getConsumedSubpartitionIndex() {
 		return consumedSubpartitionIndex;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index ca8e07c..21af73a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -687,9 +688,11 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 			int queueToRequest = subTaskIndex % numConsumerEdges;
 
-			IntermediateDataSetID resultId = edges[0].getSource().getIntermediateResult().getId();
+			IntermediateResult consumedIntermediateResult = edges[0].getSource().getIntermediateResult();
+			final IntermediateDataSetID resultId = consumedIntermediateResult.getId();
+			final ResultPartitionType partitionType = consumedIntermediateResult.getResultType();
 
-			consumedPartitions.add(new InputGateDeploymentDescriptor(resultId, queueToRequest, partitions));
+			consumedPartitions.add(new InputGateDeploymentDescriptor(resultId, partitionType, queueToRequest, partitions));
 		}
 
 		SerializedValue<JobInformation> serializedJobInformation = getExecutionGraph().getSerializedJobInformation();

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 8e85ffe..4d4b305 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -73,6 +73,11 @@ public class NetworkEnvironment {
 
 	private final int partitionRequestMaxBackoff;
 
+	/** Number of network buffers to use for each outgoing/ingoing channel (subpartition/input channel). */
+	private final int networkBuffersPerChannel;
+	/** Number of extra network buffers to use for each outgoing/ingoing gate (result partition/input gate). */
+	private final int extraNetworkBuffersPerGate;
+
 	private boolean isShutdown;
 
 	public NetworkEnvironment(
@@ -84,7 +89,9 @@ public class NetworkEnvironment {
 			KvStateServer kvStateServer,
 			IOMode defaultIOMode,
 			int partitionRequestInitialBackoff,
-			int partitionRequestMaxBackoff) {
+			int partitionRequestMaxBackoff,
+			int networkBuffersPerChannel,
+			int extraNetworkBuffersPerGate) {
 
 		this.networkBufferPool = checkNotNull(networkBufferPool);
 		this.connectionManager = checkNotNull(connectionManager);
@@ -100,6 +107,8 @@ public class NetworkEnvironment {
 		this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
 
 		isShutdown = false;
+		this.networkBuffersPerChannel = networkBuffersPerChannel;
+		this.extraNetworkBuffersPerGate = extraNetworkBuffersPerGate;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -171,7 +180,11 @@ public class NetworkEnvironment {
 				BufferPool bufferPool = null;
 
 				try {
-					bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions());
+					int maxNumberOfMemorySegments = partition.getPartitionType().isBounded() ?
+						partition.getNumberOfSubpartitions() * networkBuffersPerChannel +
+							extraNetworkBuffersPerGate : Integer.MAX_VALUE;
+					bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(),
+							maxNumberOfMemorySegments);
 					partition.registerBufferPool(bufferPool);
 
 					resultPartitionManager.registerResultPartition(partition);
@@ -198,7 +211,11 @@ public class NetworkEnvironment {
 				BufferPool bufferPool = null;
 
 				try {
-					bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels());
+					int maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
+						gate.getNumberOfInputChannels() * networkBuffersPerChannel +
+							extraNetworkBuffersPerGate : Integer.MAX_VALUE;
+					bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(),
+						maxNumberOfMemorySegments);
 					gate.setBufferPool(bufferPool);
 				} catch (Throwable t) {
 					if (bufferPool != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
index 8784b14..a4928ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
@@ -50,6 +50,13 @@ public interface BufferPool extends BufferProvider, BufferRecycler {
 	int getNumberOfRequiredMemorySegments();
 
 	/**
+	 * Returns the maximum number of memory segments this buffer pool should use
+	 *
+	 * @return maximum number of memory segments to use or <tt>-1</tt> if unlimited
+	 */
+	int getMaxNumberOfMemorySegments();
+
+	/**
 	 * Returns the current size of this buffer pool.
 	 *
 	 * <p> The size of the buffer pool can change dynamically at runtime.
@@ -59,7 +66,7 @@ public interface BufferPool extends BufferProvider, BufferRecycler {
 	/**
 	 * Sets the current size of this buffer pool.
 	 *
-	 * <p> The size needs to be greater or equals to the guaranteed number of memory segments.
+	 * <p> The size needs to be greater or equal to the guaranteed number of memory segments.
 	 */
 	void setNumBuffers(int numBuffers) throws IOException;
 
@@ -72,5 +79,4 @@ public interface BufferPool extends BufferProvider, BufferRecycler {
 	 * Returns the number of used buffers of this buffer pool.
 	 */
 	int bestEffortGetNumOfUsedBuffers();
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
index e953158..ffed432 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
@@ -30,8 +30,13 @@ public interface BufferPoolFactory {
 	 * buffers.
 	 *
 	 * <p> The buffer pool is of dynamic size with at least <tt>numRequiredBuffers</tt> buffers.
+	 *
+	 * @param numRequiredBuffers
+	 * 		minimum number of network buffers in this pool
+	 * @param maxUsedBuffers
+	 * 		maximum number of network buffers this pool offers
 	 */
-	BufferPool createBufferPool(int numRequiredBuffers) throws IOException;
+	BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) throws IOException;
 
 	/**
 	 * Destroy callback for updating factory book keeping.

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index d6a4cf7..a587997 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.util.event.EventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -44,6 +46,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * match its new size.
  */
 class LocalBufferPool implements BufferPool {
+	private static final Logger LOG = LoggerFactory.getLogger(LocalBufferPool.class);
 
 	/** Global network buffer pool to get buffers from. */
 	private final NetworkBufferPool networkBufferPool;
@@ -63,6 +66,9 @@ class LocalBufferPool implements BufferPool {
 	 */
 	private final Queue<EventListener<Buffer>> registeredListeners = new ArrayDeque<EventListener<Buffer>>();
 
+	/** Maximum number of network buffers to allocate. */
+	private final int maxNumberOfMemorySegments;
+
 	/** The current size of this pool */
 	private int currentPoolSize;
 
@@ -86,9 +92,37 @@ class LocalBufferPool implements BufferPool {
 	 * 		minimum number of network buffers
 	 */
 	LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments) {
+		this(networkBufferPool, numberOfRequiredMemorySegments, Integer.MAX_VALUE);
+	}
+
+	/**
+	 * Local buffer pool based on the given <tt>networkBufferPool</tt> with a minimal and maximal
+	 * number of network buffers being available.
+	 *
+	 * @param networkBufferPool
+	 * 		global network buffer pool to get buffers from
+	 * @param numberOfRequiredMemorySegments
+	 * 		minimum number of network buffers
+	 * @param maxNumberOfMemorySegments
+	 * 		maximum number of network buffers to allocate
+	 */
+	LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments,
+			int maxNumberOfMemorySegments) {
+		checkArgument(maxNumberOfMemorySegments >= numberOfRequiredMemorySegments,
+			"Maximum number of memory segments (%s) should not be smaller than minimum (%s).",
+			maxNumberOfMemorySegments, numberOfRequiredMemorySegments);
+
+		checkArgument(maxNumberOfMemorySegments > 0,
+			"Maximum number of memory segments (%s) should be larger than 0.",
+			maxNumberOfMemorySegments);
+
+		LOG.debug("Using a local buffer pool with {}-{} buffers",
+			numberOfRequiredMemorySegments, maxNumberOfMemorySegments);
+
 		this.networkBufferPool = networkBufferPool;
 		this.numberOfRequiredMemorySegments = numberOfRequiredMemorySegments;
 		this.currentPoolSize = numberOfRequiredMemorySegments;
+		this.maxNumberOfMemorySegments = maxNumberOfMemorySegments;
 	}
 
 	// ------------------------------------------------------------------------
@@ -113,6 +147,11 @@ class LocalBufferPool implements BufferPool {
 	}
 
 	@Override
+	public int getMaxNumberOfMemorySegments() {
+		return maxNumberOfMemorySegments;
+	}
+
+	@Override
 	public int getNumberOfAvailableMemorySegments() {
 		synchronized (availableMemorySegments) {
 			return availableMemorySegments.size();
@@ -160,6 +199,7 @@ class LocalBufferPool implements BufferPool {
 
 			boolean askToRecycle = owner != null;
 
+			// fill availableMemorySegments with at least one element, wait if required
 			while (availableMemorySegments.isEmpty()) {
 				if (isDestroyed) {
 					throw new IllegalStateException("Buffer pool is destroyed.");
@@ -257,9 +297,14 @@ class LocalBufferPool implements BufferPool {
 	@Override
 	public void setNumBuffers(int numBuffers) throws IOException {
 		synchronized (availableMemorySegments) {
-			checkArgument(numBuffers >= numberOfRequiredMemorySegments, "Buffer pool needs at least " + numberOfRequiredMemorySegments + " buffers, but tried to set to " + numBuffers + ".");
+			checkArgument(numBuffers >= numberOfRequiredMemorySegments, "Buffer pool needs at least " +
+				numberOfRequiredMemorySegments + " buffers, but tried to set to " + numBuffers + ".");
 
-			currentPoolSize = numBuffers;
+			if (numBuffers > maxNumberOfMemorySegments) {
+				currentPoolSize = maxNumberOfMemorySegments;
+			} else {
+				currentPoolSize = numBuffers;
+			}
 
 			returnExcessMemorySegments();
 
@@ -274,7 +319,10 @@ class LocalBufferPool implements BufferPool {
 	@Override
 	public String toString() {
 		synchronized (availableMemorySegments) {
-			return String.format("[size: %d, required: %d, requested: %d, available: %d, listeners: %d, destroyed: %s]", currentPoolSize, numberOfRequiredMemorySegments, numberOfRequestedMemorySegments, availableMemorySegments.size(), registeredListeners.size(), isDestroyed);
+			return String.format(
+				"[size: %d, required: %d, requested: %d, available: %d, max: %d, listeners: %d, destroyed: %s]",
+				currentPoolSize, numberOfRequiredMemorySegments, numberOfRequestedMemorySegments,
+				availableMemorySegments.size(), maxNumberOfMemorySegments, registeredListeners.size(), isDestroyed);
 		}
 	}
 
@@ -296,8 +344,7 @@ class LocalBufferPool implements BufferPool {
 				return;
 			}
 
-			networkBufferPool.recycle(segment);
-			numberOfRequestedMemorySegments--;
+			returnMemorySegment(segment);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 5345fbb..7668759 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -180,7 +180,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public BufferPool createBufferPool(int numRequiredBuffers) throws IOException {
+	public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) throws IOException {
 		// It is necessary to use a separate lock from the one used for buffer
 		// requests to ensure deadlock freedom for failure cases.
 		synchronized (factoryLock) {
@@ -205,7 +205,8 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 			// We are good to go, create a new buffer pool and redistribute
 			// non-fixed size buffers.
-			LocalBufferPool localBufferPool = new LocalBufferPool(this, numRequiredBuffers);
+			LocalBufferPool localBufferPool =
+				new LocalBufferPool(this, numRequiredBuffers, maxUsedBuffers);
 
 			allBufferPools.add(localBufferPool);
 
@@ -236,7 +237,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 	/**
 	 * Destroys all buffer pools that allocate their buffers from this
-	 * buffer pool (created via {@link #createBufferPool(int)}).
+	 * buffer pool (created via {@link #createBufferPool(int, int)}).
 	 */
 	public void destroyAllBufferPools() {
 		synchronized (factoryLock) {
@@ -258,27 +259,60 @@ public class NetworkBufferPool implements BufferPoolFactory {
 	private void redistributeBuffers() throws IOException {
 		assert Thread.holdsLock(factoryLock);
 
-		int numManagedBufferPools = allBufferPools.size();
+		// All buffers, which are not among the required ones
+		int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
 
-		if (numManagedBufferPools == 0) {
-			return; // necessary to avoid div by zero when no managed pools
+		if (numAvailableMemorySegment == 0) {
+			// in this case, we need to redistribute buffers so that every pool gets its minimum
+			for (LocalBufferPool bufferPool : allBufferPools) {
+				bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
+			}
+			return;
 		}
 
-		// All buffers, which are not among the required ones
-		int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
+		/**
+		 * With buffer pools being potentially limited, let's distribute the available memory
+		 * segments based on the capacity of each buffer pool, i.e. the maximum number of segments
+		 * an unlimited buffer pool can take is numAvailableMemorySegment, for limited buffer pools
+		 * it may be less. Based on this and the sum of all these values (totalCapacity), we build
+		 * a ratio that we use to distribute the buffers.
+		 */
 
-		// Available excess (not required) buffers per pool
-		int numExcessBuffersPerPool = numAvailableMemorySegment / numManagedBufferPools;
+		int totalCapacity = 0;
+		for (LocalBufferPool bufferPool : allBufferPools) {
+			int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
+				bufferPool.getNumberOfRequiredMemorySegments();
+			totalCapacity += Math.min(numAvailableMemorySegment, excessMax);
+		}
 
-		// Distribute leftover buffers in round robin fashion
-		int numLeftoverBuffers = numAvailableMemorySegment % numManagedBufferPools;
+		// no capacity to receive additional buffers?
+		if (totalCapacity == 0) {
+			return; // necessary to avoid div by zero when nothing to re-distribute
+		}
 
-		int bufferPoolIndex = 0;
+		int memorySegmentsToDistribute = Math.min(numAvailableMemorySegment, totalCapacity);
 
+		int totalPartsUsed = 0; // of totalCapacity
+		int numDistributedMemorySegment = 0;
 		for (LocalBufferPool bufferPool : allBufferPools) {
-			int leftoverBuffers = bufferPoolIndex++ < numLeftoverBuffers ? 1 : 0;
+			int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
+				bufferPool.getNumberOfRequiredMemorySegments();
+
+			// shortcut
+			if (excessMax == 0) {
+				continue;
+			}
+
+			totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);
 
-			bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + numExcessBuffersPerPool + leftoverBuffers);
+			// avoid remaining buffers by looking at the total capacity that should have been
+			// re-distributed up until here
+			int mySize = memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment;
+			numDistributedMemorySegment += mySize;
+			bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
 		}
+
+		assert (totalPartsUsed == totalCapacity);
+		assert (numDistributedMemorySegment == memorySegmentsToDistribute);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index 36c1234..6f56877 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -67,7 +67,7 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
 	public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
 		super.channelRegistered(ctx);
 
-		bufferPool = networkBufferPool.createBufferPool(1);
+		bufferPool = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 3d92584..eb1418b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -160,6 +160,7 @@ public class ResultPartition implements BufferPoolOwner {
 				break;
 
 			case PIPELINED:
+			case PIPELINED_BOUNDED:
 				for (int i = 0; i < subpartitions.length; i++) {
 					subpartitions[i] = new PipelinedSubpartition(i, this);
 				}
@@ -237,6 +238,15 @@ public class ResultPartition implements BufferPoolOwner {
 		return totalBuffers;
 	}
 
+	/**
+	 * Returns the type of this result partition.
+	 *
+	 * @return result partition type
+	 */
+	public ResultPartitionType getPartitionType() {
+		return partitionType;
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
index 43d3a52..256387c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java
@@ -20,9 +20,22 @@ package org.apache.flink.runtime.io.network.partition;
 
 public enum ResultPartitionType {
 
-	BLOCKING(false, false),
+	BLOCKING(false, false, false),
 
-	PIPELINED(true, true);
+	PIPELINED(true, true, false),
+
+	/**
+	 * Pipelined partitions with a bounded (local) buffer pool.
+	 *
+	 * For streaming jobs, a fixed limit on the buffer pool size should help avoid that too much
+	 * data is being buffered and checkpoint barriers are delayed. In contrast to limiting the
+	 * overall network buffer pool size, this, however, still allows to be flexible with regards
+	 * to the total number of partitions by selecting an appropriately big network buffer pool size.
+	 *
+	 * For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are
+	 * no checkpoint barriers.
+	 */
+	PIPELINED_BOUNDED(true, true, true);
 
 	/** Can the partition be consumed while being produced? */
 	private final boolean isPipelined;
@@ -30,12 +43,16 @@ public enum ResultPartitionType {
 	/** Does the partition produce back pressure when not consumed? */
 	private final boolean hasBackPressure;
 
+	/** Does this partition use a limited number of (network) buffers? */
+	private final boolean isBounded;
+
 	/**
 	 * Specifies the behaviour of an intermediate result partition at runtime.
 	 */
-	ResultPartitionType(boolean isPipelined, boolean hasBackPressure) {
+	ResultPartitionType(boolean isPipelined, boolean hasBackPressure, boolean isBounded) {
 		this.isPipelined = isPipelined;
 		this.hasBackPressure = hasBackPressure;
+		this.isBounded = isBounded;
 	}
 
 	public boolean hasBackPressure() {
@@ -49,4 +66,13 @@ public enum ResultPartitionType {
 	public boolean isPipelined() {
 		return isPipelined;
 	}
+
+	/**
+	 * Whether this partition uses a limited number of (network) buffers or not.
+	 *
+	 * @return <tt>true</tt> if the number of buffers should be bound to some limit
+	 */
+	public boolean isBounded() {
+		return isBounded;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index d546559..afe8722 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -115,6 +116,9 @@ public class SingleInputGate implements InputGate {
 	 */
 	private final IntermediateDataSetID consumedResultId;
 
+	/** The type of the partition the input gate is consuming. */
+	private final ResultPartitionType consumedPartitionType;
+
 	/**
 	 * The index of the consumed subpartition of each consumed partition. This index depends on the
 	 * {@link DistributionPattern} and the subtask indices of the producing and consuming task.
@@ -166,6 +170,7 @@ public class SingleInputGate implements InputGate {
 		String owningTaskName,
 		JobID jobId,
 		IntermediateDataSetID consumedResultId,
+		final ResultPartitionType consumedPartitionType,
 		int consumedSubpartitionIndex,
 		int numberOfInputChannels,
 		TaskActions taskActions,
@@ -175,6 +180,7 @@ public class SingleInputGate implements InputGate {
 		this.jobId = checkNotNull(jobId);
 
 		this.consumedResultId = checkNotNull(consumedResultId);
+		this.consumedPartitionType = checkNotNull(consumedPartitionType);
 
 		checkArgument(consumedSubpartitionIndex >= 0);
 		this.consumedSubpartitionIndex = consumedSubpartitionIndex;
@@ -201,6 +207,15 @@ public class SingleInputGate implements InputGate {
 		return consumedResultId;
 	}
 
+	/**
+	 * Returns the type of this input channel's consumed result partition.
+	 *
+	 * @return consumed result partition type
+	 */
+	public ResultPartitionType getConsumedPartitionType() {
+		return consumedPartitionType;
+	}
+
 	BufferProvider getBufferProvider() {
 		return bufferPool;
 	}
@@ -571,6 +586,7 @@ public class SingleInputGate implements InputGate {
 		TaskIOMetricGroup metrics) {
 
 		final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId());
+		final ResultPartitionType consumedPartitionType = checkNotNull(igdd.getConsumedPartitionType());
 
 		final int consumedSubpartitionIndex = igdd.getConsumedSubpartitionIndex();
 		checkArgument(consumedSubpartitionIndex >= 0);
@@ -578,7 +594,7 @@ public class SingleInputGate implements InputGate {
 		final InputChannelDeploymentDescriptor[] icdd = checkNotNull(igdd.getInputChannelDeploymentDescriptors());
 
 		final SingleInputGate inputGate = new SingleInputGate(
-			owningTaskName, jobId, consumedResultId, consumedSubpartitionIndex,
+			owningTaskName, jobId, consumedResultId, consumedPartitionType, consumedSubpartitionIndex,
 			icdd.length, taskActions, metrics);
 
 		// Create the input channels. There is one input channel for each consumed partition.

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 19c5c01..e3c8345 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -370,7 +370,9 @@ public class TaskManagerServices {
 			kvStateServer,
 			networkEnvironmentConfiguration.ioMode(),
 			networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
-			networkEnvironmentConfiguration.partitionRequestMaxBackoff());
+			networkEnvironmentConfiguration.partitionRequestMaxBackoff(),
+			networkEnvironmentConfiguration.networkBuffersPerChannel(),
+			networkEnvironmentConfiguration.extraNetworkBuffersPerGate());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 2c76372..8ad318a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.HeapMemorySegment;
 import org.apache.flink.core.memory.HybridMemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
@@ -311,13 +312,25 @@ public class TaskManagerServicesConfiguration {
 			ioMode = IOManager.IOMode.SYNC;
 		}
 
+		int initialRequestBackoff = configuration.getInteger(
+			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+		int maxRequestBackoff = configuration.getInteger(
+			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+		int buffersPerChannel = configuration.getInteger(
+			TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+		int extraBuffersPerGate = configuration.getInteger(
+			TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
 		return new NetworkEnvironmentConfiguration(
 			numNetworkBuffers,
 			pageSize,
 			memType,
 			ioMode,
-			500,
-			3000,
+			initialRequestBackoff,
+			maxRequestBackoff,
+			buffersPerChannel,
+			extraBuffersPerGate,
 			nettyConfig);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 6c7ca3e..4ecfe59 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -29,4 +29,6 @@ case class NetworkEnvironmentConfiguration(
     ioMode: IOMode,
     partitionRequestInitialBackoff : Int,
     partitionRequestMaxBackoff : Int,
+    networkBuffersPerChannel: Int,
+    extraNetworkBuffersPerGate: Int,
     nettyConfig: NettyConfig = null)

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
index dcdf44c..3bbe6d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/MockNetworkEnvironment.java
@@ -36,7 +36,7 @@ public class MockNetworkEnvironment {
 
 	static {
 		try {
-			when(networkBufferPool.createBufferPool(anyInt())).thenReturn(mock(BufferPool.class));
+			when(networkBufferPool.createBufferPool(anyInt(), anyInt())).thenReturn(mock(BufferPool.class));
 			when(networkEnvironment.getNetworkBufferPool()).thenReturn(networkBufferPool);
 
 			when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
new file mode 100644
index 0000000..b956691
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Various tests for the {@link NetworkEnvironment} class.
+ */
+public class NetworkEnvironmentTest {
+	private final static int numBuffers = 1024;
+
+	private final static int memorySegmentSize = 128;
+
+	/**
+	 * Verifies that {@link NetworkEnvironment#registerTask(Task)} sets up (un)bounded buffer pool
+	 * instances for various types of input and output channels.
+	 */
+	@Test
+	public void testRegisterTaskUsesBoundedBuffers() throws Exception {
+
+		final NetworkEnvironment network = new NetworkEnvironment(
+			new NetworkBufferPool(numBuffers, memorySegmentSize, MemoryType.HEAP),
+			new LocalConnectionManager(),
+			new ResultPartitionManager(),
+			new TaskEventDispatcher(),
+			new KvStateRegistry(),
+			null,
+			IOManager.IOMode.SYNC,
+			0,
+			0,
+			2,
+			8);
+
+		// result partitions
+		ResultPartition rp1 = createResultPartition(ResultPartitionType.PIPELINED, 2);
+		ResultPartition rp2 = createResultPartition(ResultPartitionType.BLOCKING, 2);
+		ResultPartition rp3 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 2);
+		ResultPartition rp4 = createResultPartition(ResultPartitionType.PIPELINED_BOUNDED, 8);
+		final ResultPartition[] resultPartitions = new ResultPartition[] {rp1, rp2, rp3, rp4};
+		final ResultPartitionWriter[] resultPartitionWriters = new ResultPartitionWriter[] {
+			new ResultPartitionWriter(rp1), new ResultPartitionWriter(rp2),
+			new ResultPartitionWriter(rp3), new ResultPartitionWriter(rp4)};
+
+		// input gates
+		final SingleInputGate[] inputGates = new SingleInputGate[] {
+			createSingleInputGateMock(ResultPartitionType.PIPELINED, 2),
+			createSingleInputGateMock(ResultPartitionType.BLOCKING, 2),
+			createSingleInputGateMock(ResultPartitionType.PIPELINED_BOUNDED, 2),
+			createSingleInputGateMock(ResultPartitionType.PIPELINED_BOUNDED, 8)};
+
+		// overall task to register
+		Task task = mock(Task.class);
+		when(task.getProducedPartitions()).thenReturn(resultPartitions);
+		when(task.getAllWriters()).thenReturn(resultPartitionWriters);
+		when(task.getAllInputGates()).thenReturn(inputGates);
+
+		network.registerTask(task);
+
+		assertEquals(Integer.MAX_VALUE, rp1.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(Integer.MAX_VALUE, rp2.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(2 * 2 + 8, rp3.getBufferPool().getMaxNumberOfMemorySegments());
+		assertEquals(8 * 2 + 8, rp4.getBufferPool().getMaxNumberOfMemorySegments());
+
+		network.shutdown();
+	}
+
+	/**
+	 * Helper to create simple {@link ResultPartition} instance for use by a {@link Task} inside
+	 * {@link NetworkEnvironment#registerTask(Task)}.
+	 *
+	 * @param partitionType
+	 * 		the produced partition type
+	 * @param channels
+	 * 		the nummer of output channels
+	 *
+	 * @return instance with minimal data set and some mocks so that it is useful for {@link
+	 * NetworkEnvironment#registerTask(Task)}
+	 */
+	private static ResultPartition createResultPartition(
+			final ResultPartitionType partitionType, final int channels) {
+		return new ResultPartition(
+			"TestTask-" + partitionType + ":" + channels,
+			mock(TaskActions.class),
+			new JobID(),
+			new ResultPartitionID(),
+			partitionType,
+			channels,
+			channels,
+			mock(ResultPartitionManager.class),
+			mock(ResultPartitionConsumableNotifier.class),
+			mock(IOManager.class),
+			false);
+	}
+
+	/**
+	 * Helper to create a mock of a {@link SingleInputGate} for use by a {@link Task} inside
+	 * {@link NetworkEnvironment#registerTask(Task)}.
+	 *
+	 * @param partitionType
+	 * 		the consumed partition type
+	 * @param channels
+	 * 		the nummer of input channels
+	 *
+	 * @return mock with minimal functionality necessary by {@link NetworkEnvironment#registerTask(Task)}
+	 */
+	private static SingleInputGate createSingleInputGateMock(
+			final ResultPartitionType partitionType, final int channels) {
+		SingleInputGate ig = mock(SingleInputGate.class);
+		when(ig.getConsumedPartitionType()).thenReturn(partitionType);
+		when(ig.getNumberOfInputChannels()).thenReturn(channels);
+		doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(final InvocationOnMock invocation) throws Throwable {
+				BufferPool bp = invocation.getArgumentAt(0, BufferPool.class);
+				if (partitionType == ResultPartitionType.PIPELINED_BOUNDED) {
+					assertEquals(channels * 2 + 8, bp.getMaxNumberOfMemorySegments());
+				} else {
+					assertEquals(Integer.MAX_VALUE, bp.getMaxNumberOfMemorySegments());
+				}
+				return null;
+			}
+		}).when(ig).setBufferPool(any(BufferPool.class));
+		return ig;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index ca1d0a5..7d83fb5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -177,7 +177,7 @@ public class RecordWriterTest {
 
 		try {
 			buffers = new NetworkBufferPool(1, 1024, MemoryType.HEAP);
-			bufferPool = spy(buffers.createBufferPool(1));
+			bufferPool = spy(buffers.createBufferPool(1, Integer.MAX_VALUE));
 
 			ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class);
 			when(partitionWriter.getBufferProvider()).thenReturn(checkNotNull(bufferPool));

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
index 49808c9..ce76a6d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 
 public class BufferPoolFactoryTest {
@@ -48,61 +49,190 @@ public class BufferPoolFactoryTest {
 	public void verifyAllBuffersReturned() {
 		String msg = "Did not return all buffers to network buffer pool after test.";
 		assertEquals(msg, numBuffers, networkBufferPool.getNumberOfAvailableMemorySegments());
+		// in case buffers have actually been requested, we must release them again
+		networkBufferPool.destroy();
 	}
 
 	@Test(expected = IOException.class)
 	public void testRequireMoreThanPossible() throws IOException {
-		networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2);
+		networkBufferPool.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() * 2, Integer.MAX_VALUE);
+	}
+
+	@Test
+	public void testBoundedPools() throws IOException {
+		BufferPool lbp = networkBufferPool.createBufferPool(1, 1);
+		assertEquals(1, lbp.getNumBuffers());
+
+		lbp = networkBufferPool.createBufferPool(1, 2);
+		assertEquals(2, lbp.getNumBuffers());
 	}
 
 	@Test
 	public void testSingleManagedPoolGetsAll() throws IOException {
-		BufferPool lbp = networkBufferPool.createBufferPool(1);
+		BufferPool lbp = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
 
 		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), lbp.getNumBuffers());
 	}
 
 	@Test
+	public void testSingleManagedPoolGetsAllExceptFixedOnes() throws IOException {
+		BufferPool fixed = networkBufferPool.createBufferPool(24, 24);
+
+		BufferPool lbp = networkBufferPool.createBufferPool(1, Integer.MAX_VALUE);
+
+		assertEquals(24, fixed.getNumBuffers());
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() - fixed.getNumBuffers(), lbp.getNumBuffers());
+	}
+
+	@Test
 	public void testUniformDistribution() throws IOException {
-		BufferPool first = networkBufferPool.createBufferPool(0);
-		BufferPool second = networkBufferPool.createBufferPool(0);
+		BufferPool first = networkBufferPool.createBufferPool(0, Integer.MAX_VALUE);
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), first.getNumBuffers());
 
+		BufferPool second = networkBufferPool.createBufferPool(0, Integer.MAX_VALUE);
 		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, first.getNumBuffers());
 		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, second.getNumBuffers());
 	}
 
+	/**
+	 * Tests that buffers, once given to an initial buffer pool, get re-distributed to a second one
+	 * in case both buffer pools request half of the available buffer count.
+	 */
 	@Test
-	public void testAllDistributed() {
-		Random random = new Random();
+	public void testUniformDistributionAllBuffers() throws IOException {
+		BufferPool first = networkBufferPool
+			.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() / 2, Integer.MAX_VALUE);
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), first.getNumBuffers());
 
-		try {
-			List<BufferPool> pools = new ArrayList<BufferPool>();
+		BufferPool second = networkBufferPool
+			.createBufferPool(networkBufferPool.getTotalNumberOfMemorySegments() / 2, Integer.MAX_VALUE);
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, first.getNumBuffers());
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2,
+			second.getNumBuffers());
+	}
 
-			int numPools = numBuffers / 32;
-			for (int i = 0; i < numPools; i++) {
-				pools.add(networkBufferPool.createBufferPool(random.nextInt(7 + 1)));
+	@Test
+	public void testUniformDistributionBounded1() throws IOException {
+		BufferPool first = networkBufferPool.createBufferPool(0, networkBufferPool.getTotalNumberOfMemorySegments());
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), first.getNumBuffers());
+
+		BufferPool second = networkBufferPool.createBufferPool(0, networkBufferPool.getTotalNumberOfMemorySegments());
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, first.getNumBuffers());
+		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, second.getNumBuffers());
+	}
+
+	@Test
+	public void testUniformDistributionBounded2() throws IOException {
+		BufferPool first = networkBufferPool.createBufferPool(0, 10);
+		assertEquals(10, first.getNumBuffers());
+
+		BufferPool second = networkBufferPool.createBufferPool(0, 10);
+		assertEquals(10, first.getNumBuffers());
+		assertEquals(10, second.getNumBuffers());
+	}
+
+	@Test
+	public void testUniformDistributionBounded3() throws IOException {
+		NetworkBufferPool globalPool = new NetworkBufferPool(3, 128, MemoryType.HEAP);
+		BufferPool first = globalPool.createBufferPool(0, 10);
+		assertEquals(3, first.getNumBuffers());
+
+		BufferPool second = globalPool.createBufferPool(0, 10);
+		// the order of which buffer pool received 2 or 1 buffer is undefined
+		assertEquals(3, first.getNumBuffers() + second.getNumBuffers());
+		assertNotEquals(3, first.getNumBuffers());
+		assertNotEquals(3, second.getNumBuffers());
+
+		BufferPool third = globalPool.createBufferPool(0, 10);
+		assertEquals(1, first.getNumBuffers());
+		assertEquals(1, second.getNumBuffers());
+		assertEquals(1, third.getNumBuffers());
+
+		// similar to #verifyAllBuffersReturned()
+		String msg = "Did not return all buffers to network buffer pool after test.";
+		assertEquals(msg, 3, globalPool.getNumberOfAvailableMemorySegments());
+		// in case buffers have actually been requested, we must release them again
+		globalPool.destroy();
+	}
+
+	@Test
+	public void testBufferRedistributionMixed1() throws IOException {
+		// try multiple times for various orders during redistribution
+		for (int i = 0; i < 1_000; ++i) {
+			BufferPool first = networkBufferPool.createBufferPool(0, 10);
+			assertEquals(10, first.getNumBuffers());
+
+			BufferPool second = networkBufferPool.createBufferPool(0, 10);
+			assertEquals(10, first.getNumBuffers());
+			assertEquals(10, second.getNumBuffers());
+
+			BufferPool third = networkBufferPool.createBufferPool(0, Integer.MAX_VALUE);
+			// note: exact buffer distribution depends on the order during the redistribution
+			for (BufferPool bp : new BufferPool[] {first, second, third}) {
+				int size = networkBufferPool.getTotalNumberOfMemorySegments() *
+					Math.min(networkBufferPool.getTotalNumberOfMemorySegments(),
+						bp.getMaxNumberOfMemorySegments()) /
+					(networkBufferPool.getTotalNumberOfMemorySegments() + 20);
+				if (bp.getNumBuffers() != size && bp.getNumBuffers() != (size + 1)) {
+					fail("wrong buffer pool size after redistribution: " + bp.getNumBuffers());
+				}
 			}
 
-			int numDistributedBuffers = 0;
-			for (BufferPool pool : pools) {
-				numDistributedBuffers += pool.getNumBuffers();
+			BufferPool fourth = networkBufferPool.createBufferPool(0, Integer.MAX_VALUE);
+			// note: exact buffer distribution depends on the order during the redistribution
+			for (BufferPool bp : new BufferPool[] {first, second, third, fourth}) {
+				int size = networkBufferPool.getTotalNumberOfMemorySegments() *
+					Math.min(networkBufferPool.getTotalNumberOfMemorySegments(),
+						bp.getMaxNumberOfMemorySegments()) /
+					(2 * networkBufferPool.getTotalNumberOfMemorySegments() + 20);
+				if (bp.getNumBuffers() != size && bp.getNumBuffers() != (size + 1)) {
+					fail("wrong buffer pool size after redistribution: " + bp.getNumBuffers());
+				}
 			}
 
-			assertEquals(numBuffers, numDistributedBuffers);
+			verifyAllBuffersReturned();
+			setupNetworkBufferPool();
 		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail(t.getMessage());
+	}
+
+	@Test
+	public void testAllDistributed() throws IOException {
+		// try multiple times for various orders during redistribution
+		for (int i = 0; i < 1_000; ++i) {
+			Random random = new Random();
+
+			List<BufferPool> pools = new ArrayList<BufferPool>();
+
+			int numPools = numBuffers / 32;
+			long maxTotalUsed = 0;
+			for (int j = 0; j < numPools; j++) {
+				int numRequiredBuffers = random.nextInt(7 + 1);
+				// make unbounded buffers more likely:
+				int maxUsedBuffers = random.nextBoolean() ? Integer.MAX_VALUE :
+					Math.max(1, random.nextInt(10) + numRequiredBuffers);
+				pools.add(networkBufferPool.createBufferPool(numRequiredBuffers, maxUsedBuffers));
+				maxTotalUsed = Math.min(numBuffers, maxTotalUsed + maxUsedBuffers);
+
+				// after every iteration, all buffers (up to maxTotalUsed) must be distributed
+				int numDistributedBuffers = 0;
+				for (BufferPool pool : pools) {
+					numDistributedBuffers += pool.getNumBuffers();
+				}
+				assertEquals(maxTotalUsed, numDistributedBuffers);
+			}
+
+			verifyAllBuffersReturned();
+			setupNetworkBufferPool();
 		}
 	}
 
 	@Test
 	public void testCreateDestroy() throws IOException {
-		BufferPool first = networkBufferPool.createBufferPool(0);
+		BufferPool first = networkBufferPool.createBufferPool(0, Integer.MAX_VALUE);
 
 		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments(), first.getNumBuffers());
 
-		BufferPool second = networkBufferPool.createBufferPool(0);
+		BufferPool second = networkBufferPool.createBufferPool(0, Integer.MAX_VALUE);
 
 		assertEquals(networkBufferPool.getTotalNumberOfMemorySegments() / 2, first.getNumBuffers());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 93731e1..a186d56 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -328,6 +328,64 @@ public class LocalBufferPoolTest {
 		}
 	}
 
+	@Test
+	public void testBoundedBuffer() throws Exception {
+		localBufferPool.lazyDestroy();
+
+		localBufferPool = new LocalBufferPool(networkBufferPool, 1, 2);
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertEquals(2, localBufferPool.getMaxNumberOfMemorySegments());
+
+		Buffer buffer1, buffer2;
+
+		// check min number of buffers:
+		localBufferPool.setNumBuffers(1);
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNotNull(buffer1 = localBufferPool.requestBuffer());
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNull(localBufferPool.requestBuffer());
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		buffer1.recycle();
+		assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+
+		// check max number of buffers:
+		localBufferPool.setNumBuffers(2);
+		assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNotNull(buffer1 = localBufferPool.requestBuffer());
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNotNull(buffer2 = localBufferPool.requestBuffer());
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNull(localBufferPool.requestBuffer());
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		buffer1.recycle();
+		assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+		buffer2.recycle();
+		assertEquals(2, localBufferPool.getNumberOfAvailableMemorySegments());
+
+		// try to set too large buffer size:
+		localBufferPool.setNumBuffers(3);
+		assertEquals(2, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNotNull(buffer1 = localBufferPool.requestBuffer());
+		assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNotNull(buffer2 = localBufferPool.requestBuffer());
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNull(localBufferPool.requestBuffer());
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		buffer1.recycle();
+		assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+		buffer2.recycle();
+		assertEquals(2, localBufferPool.getNumberOfAvailableMemorySegments());
+
+		// decrease size again
+		localBufferPool.setNumBuffers(1);
+		assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNotNull(buffer1 = localBufferPool.requestBuffer());
+		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
+		assertNull(localBufferPool.requestBuffer());
+		buffer1.recycle();
+		assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
+	}
+
 	// ------------------------------------------------------------------------
 	// Helpers
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index ab32685..7c6a543 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.io.network.buffer;
 import org.apache.flink.core.memory.MemoryType;
 import org.junit.Test;
 
+import java.util.ArrayList;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -47,7 +49,23 @@ public class NetworkBufferPoolTest {
 			assertTrue(globalPool.isDestroyed());
 
 			try {
-				globalPool.createBufferPool(2);
+				globalPool.createBufferPool(2, 2);
+				fail("Should throw an IllegalStateException");
+			}
+			catch (IllegalStateException e) {
+				// yippie!
+			}
+
+			try {
+				globalPool.createBufferPool(2, 10);
+				fail("Should throw an IllegalStateException");
+			}
+			catch (IllegalStateException e) {
+				// yippie!
+			}
+
+			try {
+				globalPool.createBufferPool(2, Integer.MAX_VALUE);
 				fail("Should throw an IllegalStateException");
 			}
 			catch (IllegalStateException e) {
@@ -60,39 +78,51 @@ public class NetworkBufferPoolTest {
 		}
 
 	}
+
 	@Test
 	public void testDestroyAll() {
 		try {
-			NetworkBufferPool globalPool = new NetworkBufferPool(8, 128, MemoryType.HEAP);
-
-			BufferPool lbp = globalPool.createBufferPool(5);
-
-			assertEquals(5, lbp.getNumberOfRequiredMemorySegments());
-
-			Buffer[] buffers = {
-					lbp.requestBuffer(),
-					lbp.requestBuffer(),
-					lbp.requestBuffer(),
-					lbp.requestBuffer(),
-					lbp.requestBuffer(),
-					lbp.requestBuffer(),
-					lbp.requestBuffer(),
-					lbp.requestBuffer()
-			};
-
-			for (Buffer b : buffers) {
-				assertNotNull(b);
-				assertNotNull(b.getMemorySegment());
+			NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP);
+
+			BufferPool fixedPool = globalPool.createBufferPool(2, 2);
+			BufferPool boundedPool = globalPool.createBufferPool(0, 1);
+			BufferPool nonFixedPool = globalPool.createBufferPool(5, Integer.MAX_VALUE);
+
+			assertEquals(2, fixedPool.getNumberOfRequiredMemorySegments());
+			assertEquals(0, boundedPool.getNumberOfRequiredMemorySegments());
+			assertEquals(5, nonFixedPool.getNumberOfRequiredMemorySegments());
+
+			// actually, the buffer pool sizes may be different due to rounding and based on the internal order of
+			// the buffer pools - the total number of retrievable buffers should be equal to the number of buffers
+			// in the NetworkBufferPool though
+
+			ArrayList<Buffer> buffers = new ArrayList<>(globalPool.getTotalNumberOfMemorySegments());
+			collectBuffers:
+			for (int i = 0; i < 10; ++i) {
+				for (BufferPool bp : new BufferPool[] { fixedPool, boundedPool, nonFixedPool }) {
+					Buffer buffer = bp.requestBuffer();
+					if (buffer != null) {
+						assertNotNull(buffer.getMemorySegment());
+						buffers.add(buffer);
+						continue collectBuffers;
+					}
+				}
 			}
 
-			assertNull(lbp.requestBuffer());
+			assertEquals(globalPool.getTotalNumberOfMemorySegments(), buffers.size());
+
+			assertNull(fixedPool.requestBuffer());
+			assertNull(boundedPool.requestBuffer());
+			assertNull(nonFixedPool.requestBuffer());
 
 			// destroy all allocated ones
 			globalPool.destroyAllBufferPools();
 
 			// check the destroyed status
 			assertFalse(globalPool.isDestroyed());
-			assertTrue(lbp.isDestroyed());
+			assertTrue(fixedPool.isDestroyed());
+			assertTrue(boundedPool.isDestroyed());
+			assertTrue(nonFixedPool.isDestroyed());
 
 			assertEquals(0, globalPool.getNumberOfRegisteredBufferPools());
 
@@ -107,7 +137,23 @@ public class NetworkBufferPoolTest {
 
 			// can request no more buffers
 			try {
-				lbp.requestBuffer();
+				fixedPool.requestBuffer();
+				fail("Should fail with an IllegalStateException");
+			}
+			catch (IllegalStateException e) {
+				// yippie!
+			}
+
+			try {
+				boundedPool.requestBuffer();
+				fail("Should fail with an IllegalStateException");
+			}
+			catch (IllegalStateException e) {
+				// that's the way we like it, aha, aha
+			}
+
+			try {
+				nonFixedPool.requestBuffer();
 				fail("Should fail with an IllegalStateException");
 			}
 			catch (IllegalStateException e) {
@@ -115,7 +161,7 @@ public class NetworkBufferPoolTest {
 			}
 
 			// can create a new pool now
-			assertNotNull(globalPool.createBufferPool(8));
+			assertNotNull(globalPool.createBufferPool(10, Integer.MAX_VALUE));
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 27177c9..64f82f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -60,7 +60,7 @@ public class InputGateConcurrentTest {
 		final SingleInputGate gate = new SingleInputGate(
 				"Test Task Name",
 				new JobID(),
-				new IntermediateDataSetID(),
+				new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
 				0, numChannels,
 				mock(TaskActions.class),
 				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
@@ -95,7 +95,7 @@ public class InputGateConcurrentTest {
 		final SingleInputGate gate = new SingleInputGate(
 				"Test Task Name",
 				new JobID(),
-				new IntermediateDataSetID(),
+				new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
 				0,
 				numChannels,
 				mock(TaskActions.class),
@@ -144,7 +144,7 @@ public class InputGateConcurrentTest {
 		final SingleInputGate gate = new SingleInputGate(
 				"Test Task Name",
 				new JobID(),
-				new IntermediateDataSetID(),
+				new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
 				0,
 				numChannels,
 				mock(TaskActions.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index 7e1d792..f933840 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -347,7 +347,8 @@ public class InputGateFairnessTest {
 				TaskActions taskActions,
 				TaskIOMetricGroup metrics) {
 
-			super(owningTaskName, jobId, consumedResultId, consumedSubpartitionIndex,
+			super(owningTaskName, jobId, consumedResultId, ResultPartitionType.PIPELINED,
+				consumedSubpartitionIndex,
 					numberOfInputChannels, taskActions, metrics);
 
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 15ff2da..18c3038 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -128,7 +128,7 @@ public class LocalInputChannelTest {
 
 			// Create a buffer pool for this partition
 			partition.registerBufferPool(
-				networkBuffers.createBufferPool(producerBufferPoolSize));
+				networkBuffers.createBufferPool(producerBufferPoolSize, producerBufferPoolSize));
 
 			// Create the producer
 			partitionProducers[i] = new TestPartitionProducer(
@@ -162,7 +162,7 @@ public class LocalInputChannelTest {
 						i,
 						parallelism,
 						numberOfBuffersPerChannel,
-						networkBuffers.createBufferPool(parallelism),
+						networkBuffers.createBufferPool(parallelism, parallelism),
 						partitionManager,
 						new TaskEventDispatcher(),
 						partitionIds)));
@@ -284,6 +284,7 @@ public class LocalInputChannelTest {
 			"test task name",
 			new JobID(),
 			new IntermediateDataSetID(),
+			ResultPartitionType.PIPELINED,
 			0,
 			1,
 			mock(TaskActions.class),
@@ -481,6 +482,7 @@ public class LocalInputChannelTest {
 					"Test Name",
 					new JobID(),
 					new IntermediateDataSetID(),
+					ResultPartitionType.PIPELINED,
 					subpartitionIndex,
 					numberOfInputChannels,
 					mock(TaskActions.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index a25b8d5..2d1b4b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -73,7 +74,13 @@ public class SingleInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final SingleInputGate inputGate = new SingleInputGate(
-			"Test Task Name", new JobID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			"Test Task Name", new JobID(),
+			new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
+			0, 2,
+			mock(TaskActions.class),
+			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+
+		assertEquals(ResultPartitionType.PIPELINED, inputGate.getConsumedPartitionType());
 
 		final TestInputChannel[] inputChannels = new TestInputChannel[]{
 			new TestInputChannel(inputGate, 0),
@@ -127,7 +134,12 @@ public class SingleInputGateTest {
 		// Setup reader with one local and one unknown input channel
 		final IntermediateDataSetID resultId = new IntermediateDataSetID();
 
-		final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), resultId, 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+		final SingleInputGate inputGate = new SingleInputGate(
+				"Test Task Name", new JobID(),
+				resultId, ResultPartitionType.PIPELINED,
+				0, 2,
+				mock(TaskActions.class),
+				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 		final BufferPool bufferPool = mock(BufferPool.class);
 		when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 
@@ -179,6 +191,7 @@ public class SingleInputGateTest {
 			"t1",
 			new JobID(),
 			new IntermediateDataSetID(),
+			ResultPartitionType.PIPELINED,
 			0,
 			1,
 			mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
@@ -218,6 +231,7 @@ public class SingleInputGateTest {
 			"InputGate",
 			new JobID(),
 			new IntermediateDataSetID(),
+			ResultPartitionType.PIPELINED,
 			0,
 			1,
 			mock(TaskActions.class),
@@ -303,7 +317,9 @@ public class SingleInputGateTest {
 				partitionIds[2],
 				ResultPartitionLocation.createUnknown())};
 
-		InputGateDeploymentDescriptor gateDesc = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), 0, channelDescs);
+		InputGateDeploymentDescriptor gateDesc =
+			new InputGateDeploymentDescriptor(new IntermediateDataSetID(),
+				ResultPartitionType.PIPELINED, 0, channelDescs);
 
 		int initialBackoff = 137;
 		int maxBackoff = 1001;
@@ -324,6 +340,8 @@ public class SingleInputGateTest {
 			mock(TaskActions.class),
 			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
+		assertEquals(gateDesc.getConsumedPartitionType(), gate.getConsumedPartitionType());
+
 		Map<IntermediateResultPartitionID, InputChannel> channelMap = gate.getInputChannels();
 
 		assertEquals(3, channelMap.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index fe3b087..18ad490 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
@@ -55,6 +56,7 @@ public class TestSingleInputGate {
 			"Test Task Name",
 			new JobID(),
 			new IntermediateDataSetID(),
+			ResultPartitionType.PIPELINED,
 			0,
 			numberOfInputChannels,
 			mock(TaskActions.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index c05df0a..bc1dd07 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.taskmanager.TaskActions;
@@ -42,8 +43,18 @@ public class UnionInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final String testTaskName = "Test Task";
-		final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new IntermediateDataSetID(), 0, 3, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
-		final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new IntermediateDataSetID(), 0, 5, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+		final SingleInputGate ig1 = new SingleInputGate(
+			testTaskName, new JobID(),
+			new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
+			0, 3,
+			mock(TaskActions.class),
+			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+		final SingleInputGate ig2 = new SingleInputGate(
+			testTaskName, new JobID(),
+			new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
+			0, 5,
+			mock(TaskActions.class),
+			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index e26e176..cecfe6a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -113,7 +113,8 @@ public class TaskManagerComponentsStartupShutdownTest {
 				false); // exit-jvm-on-fatal-error
 
 			final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
-					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, null);
+					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC,
+					0, 0, 2, 8, null);
 
 			ResourceID taskManagerId = ResourceID.generate();
 			
@@ -130,7 +131,9 @@ public class TaskManagerComponentsStartupShutdownTest {
 				null,
 				netConf.ioMode(),
 				netConf.partitionRequestInitialBackoff(),
-				netConf.partitionRequestMaxBackoff());
+				netConf.partitionRequestMaxBackoff(),
+				netConf.networkBuffersPerChannel(),
+				netConf.extraNetworkBuffersPerGate());
 
 			network.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 356d693..0b39e20 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -69,6 +69,7 @@ import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
 import org.apache.flink.runtime.messages.TaskMessages.StopTask;
 import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -91,6 +92,7 @@ import scala.concurrent.duration.FiniteDuration;
 import scala.util.Failure;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.ArrayList;
@@ -637,7 +639,7 @@ public class TaskManagerTest extends TestLogger {
 
 				InputGateDeploymentDescriptor ircdd =
 						new InputGateDeploymentDescriptor(
-								new IntermediateDataSetID(),
+								new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
 								0, new InputChannelDeploymentDescriptor[]{
 										new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())
 								}
@@ -782,7 +784,7 @@ public class TaskManagerTest extends TestLogger {
 
 				InputGateDeploymentDescriptor ircdd =
 						new InputGateDeploymentDescriptor(
-								new IntermediateDataSetID(),
+								new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
 								0, new InputChannelDeploymentDescriptor[]{
 										new InputChannelDeploymentDescriptor(new ResultPartitionID(partitionId, eid1), ResultPartitionLocation.createLocal())
 								}
@@ -936,7 +938,7 @@ public class TaskManagerTest extends TestLogger {
 								new InputChannelDeploymentDescriptor(partitionId, loc)};
 
 				final InputGateDeploymentDescriptor igdd =
-						new InputGateDeploymentDescriptor(resultId, 0, icdd);
+						new InputGateDeploymentDescriptor(resultId, ResultPartitionType.PIPELINED, 0, icdd);
 
 				final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
 						jid, "TestJob", vid, eid,
@@ -978,6 +980,25 @@ public class TaskManagerTest extends TestLogger {
 		}};
 	}
 
+	@Test
+	public void testTaskManagerServicesConfiguration() throws Exception {
+
+		// set some non-default values
+		final Configuration config = new Configuration();
+		config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+		config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+		config.setInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL, 10);
+		config.setInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
+
+		TaskManagerServicesConfiguration tmConfig =
+			TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getByName("localhost"), true);
+
+		assertEquals(tmConfig.getNetworkConfig().partitionRequestInitialBackoff(), 100);
+		assertEquals(tmConfig.getNetworkConfig().partitionRequestMaxBackoff(), 200);
+		assertEquals(tmConfig.getNetworkConfig().networkBuffersPerChannel(), 10);
+		assertEquals(tmConfig.getNetworkConfig().extraNetworkBuffersPerGate(), 100);
+	}
+
 	/**
 	 *  Tests that repeated local {@link PartitionNotFoundException}s ultimately fail the receiver.
 	 */
@@ -1031,7 +1052,7 @@ public class TaskManagerTest extends TestLogger {
 								new InputChannelDeploymentDescriptor(partitionId, loc)};
 
 				final InputGateDeploymentDescriptor igdd =
-						new InputGateDeploymentDescriptor(resultId, 0, icdd);
+						new InputGateDeploymentDescriptor(resultId, ResultPartitionType.PIPELINED, 0, icdd);
 
 				final TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(
 						jid, "TestJob", vid, eid,

http://git-wip-us.apache.org/repos/asf/flink/blob/11e2aa6d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 003eff9..c18b527 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -402,17 +402,17 @@ public class StreamingJobGraphGenerator {
 			jobEdge = downStreamVertex.connectNewDataSetAsInput(
 				headVertex,
 				DistributionPattern.POINTWISE,
-				ResultPartitionType.PIPELINED);
+				ResultPartitionType.PIPELINED_BOUNDED);
 		} else if (partitioner instanceof RescalePartitioner){
 			jobEdge = downStreamVertex.connectNewDataSetAsInput(
 				headVertex,
 				DistributionPattern.POINTWISE,
-				ResultPartitionType.PIPELINED);
+				ResultPartitionType.PIPELINED_BOUNDED);
 		} else {
 			jobEdge = downStreamVertex.connectNewDataSetAsInput(
 					headVertex,
 					DistributionPattern.ALL_TO_ALL,
-					ResultPartitionType.PIPELINED);
+					ResultPartitionType.PIPELINED_BOUNDED);
 		}
 		// set strategy name so that web interface can show it.
 		jobEdge.setShipStrategyName(partitioner.toString());