You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/11/24 09:49:22 UTC

flink git commit: [FLINK-7316][network] Always use off-heap network buffers.

Repository: flink
Updated Branches:
  refs/heads/master 54dd91603 -> 1854a3de1


[FLINK-7316][network] Always use off-heap network buffers.

This is another step at using or own (off-heap) buffers for network
communication that we pass through netty in order to avoid unnecessary buffer
copies.

This closes #4481.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1854a3de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1854a3de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1854a3de

Branch: refs/heads/master
Commit: 1854a3de19a8f73a49e3c1d9438d61b5e4c6a452
Parents: 54dd916
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Aug 1 13:24:00 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Nov 24 10:48:32 2017 +0100

----------------------------------------------------------------------
 docs/ops/config.md                              |   4 +
 flink-dist/src/main/flink-bin/bin/config.sh     |   7 +-
 .../ContaineredTaskManagerParameters.java       |   4 +-
 .../io/network/buffer/NetworkBufferPool.java    |  27 ++---
 .../taskexecutor/TaskManagerServices.java       | 106 +++++++++----------
 .../TaskManagerServicesConfiguration.java       |  31 ++++--
 .../NetworkEnvironmentConfiguration.java        |  15 +--
 .../ContaineredTaskManagerParametersTest.java   |  41 ++++++-
 .../io/network/NetworkEnvironmentTest.java      |   4 +-
 .../io/network/api/writer/RecordWriterTest.java |   4 +-
 .../network/buffer/BufferPoolFactoryTest.java   |   7 +-
 .../buffer/LocalBufferPoolDestroyTest.java      |   3 +-
 .../io/network/buffer/LocalBufferPoolTest.java  |   9 +-
 .../network/buffer/NetworkBufferPoolTest.java   |  19 ++--
 .../consumer/LocalInputChannelTest.java         |   7 +-
 .../BackPressureStatsTrackerITCase.java         |   4 +-
 .../taskexecutor/TaskManagerServicesTest.java   |  21 +++-
 ...askManagerComponentsStartupShutdownTest.java |   4 +-
 .../io/BarrierBufferMassiveRandomTest.java      |  17 ++-
 .../YARNSessionCapacitySchedulerITCase.java     |   8 +-
 20 files changed, 195 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/docs/ops/config.md
----------------------------------------------------------------------
diff --git a/docs/ops/config.md b/docs/ops/config.md
index bcf7671..ed65880 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -673,6 +673,10 @@ for each point-to-point exchange of data over the network, which typically happe
 repartitioning or broadcasting steps (shuffle phase). In those, each parallel task inside the
 TaskManager has to be able to talk to all other parallel tasks.
 
+<div class="alert alert-warning">
+  <strong>Note:</strong> Since Flink 1.5, network buffers will always be allocated off-heap, i.e. outside of the JVM heap, irrespective of the value of <code>taskmanager.memory.off-heap</code>. This way, we can pass these buffers directly to the underlying network stack layers.
+</div>
+
 #### Setting Memory Fractions
 
 Previously, the number of network buffers was set manually which became a quite error-prone task

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 3055999..1b03e68 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -570,13 +570,12 @@ calculateTaskManagerHeapSizeMB() {
         exit 1
     fi
 
-    local tm_heap_size_mb=${FLINK_TM_HEAP}
+    local network_buffers_mb=$(($(calculateNetworkBufferMemory) >> 20)) # bytes to megabytes
+    # network buffers are always off-heap and thus need to be deduced from the heap memory size
+    local tm_heap_size_mb=$((${FLINK_TM_HEAP} - network_buffers_mb))
 
     if useOffHeapMemory; then
 
-        local network_buffers_mb=$(($(calculateNetworkBufferMemory) >> 20)) # bytes to megabytes
-        tm_heap_size_mb=$((tm_heap_size_mb - network_buffers_mb))
-
         if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
             # We split up the total memory in heap and off-heap memory
             if [[ "${tm_heap_size_mb}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index c35cf81..c4dd486 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -142,7 +142,7 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 		// (2) split the remaining Java memory between heap and off-heap
 		final long heapSizeMB = TaskManagerServices.calculateHeapSizeMB(javaMemorySizeMB, config);
 		// use the cut-off memory for off-heap (that was its intention)
-		final long offHeapSize = javaMemorySizeMB == heapSizeMB ? -1L : containerMemoryMB - heapSizeMB;
+		final long offHeapSizeMB = containerMemoryMB - heapSizeMB;
 
 		// (3) obtain the additional environment variables from the configuration
 		final HashMap<String, String> envVars = new HashMap<>();
@@ -158,6 +158,6 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 
 		// done
 		return new ContaineredTaskManagerParameters(
-			containerMemoryMB, heapSizeMB, offHeapSize, numSlots, envVars);
+			containerMemoryMB, heapSizeMB, offHeapSizeMB, numSlots, envVars);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index f899f05..7b817ca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -21,9 +21,9 @@ package org.apache.flink.runtime.io.network.buffer;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.MathUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,7 +37,6 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * The NetworkBufferPool is a fixed size pool of {@link MemorySegment} instances
@@ -63,15 +62,14 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 	private final Object factoryLock = new Object();
 
-	private final Set<LocalBufferPool> allBufferPools = new HashSet<LocalBufferPool>();
+	private final Set<LocalBufferPool> allBufferPools = new HashSet<>();
 
 	private int numTotalRequiredBuffers;
 
 	/**
 	 * Allocates all {@link MemorySegment} instances managed by this pool.
 	 */
-	public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize, MemoryType memoryType) {
-		checkNotNull(memoryType);
+	public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {
 		
 		this.totalNumberOfMemorySegments = numberOfSegmentsToAllocate;
 		this.memorySegmentSize = segmentSize;
@@ -87,20 +85,9 @@ public class NetworkBufferPool implements BufferPoolFactory {
 		}
 
 		try {
-			if (memoryType == MemoryType.HEAP) {
-				for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
-					byte[] memory = new byte[segmentSize];
-					availableMemorySegments.add(MemorySegmentFactory.wrapPooledHeapMemory(memory, null));
-				}
-			}
-			else if (memoryType == MemoryType.OFF_HEAP) {
-				for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
-					ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);
-					availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null));
-				}
-			}
-			else {
-				throw new IllegalArgumentException("Unknown memory type " + memoryType);
+			for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
+				ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);
+				availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null));
 			}
 		}
 		catch (OutOfMemoryError err) {
@@ -336,7 +323,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 			return;
 		}
 
-		/**
+		/*
 		 * With buffer pools being potentially limited, let's distribute the available memory
 		 * segments based on the capacity of each buffer pool, i.e. the maximum number of segments
 		 * an unlimited buffer pool can take is numAvailableMemorySegment, for limited buffer pools

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 4daff05..0756529 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -213,11 +213,11 @@ public class TaskManagerServices {
 		// computing the amount of memory to use depends on how much memory is available
 		// it strictly needs to happen AFTER the network stack has been initialized
 
-		MemoryType memType = taskManagerServicesConfiguration.getNetworkConfig().memoryType();
-
 		// check if a value has been configured
 		long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory();
 
+		MemoryType memType = taskManagerServicesConfiguration.getMemoryType();
+
 		final long memorySize;
 
 		boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory();
@@ -234,7 +234,7 @@ public class TaskManagerServices {
 			float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();
 
 			if (memType == MemoryType.HEAP) {
-				// network buffers already allocated -> use memoryFraction of the remaining heap:
+				// network buffers allocated off-heap -> use memoryFraction of the available heap:
 				long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * memoryFraction);
 				if (preAllocateMemory) {
 					LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." ,
@@ -247,10 +247,10 @@ public class TaskManagerServices {
 			} else if (memType == MemoryType.OFF_HEAP) {
 				// The maximum heap memory has been adjusted according to the fraction (see
 				// calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config)), i.e.
-				// maxJvmHeap = jvmHeapNoNet - jvmHeapNoNet * memoryFraction = jvmHeapNoNet * (1 - memoryFraction)
-				// directMemorySize = jvmHeapNoNet * memoryFraction
-				long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
-				long directMemorySize = (long) (maxMemory / (1.0 - memoryFraction) * memoryFraction);
+				// maxJvmHeap = jvmTotalNoNet - jvmTotalNoNet * memoryFraction = jvmTotalNoNet * (1 - memoryFraction)
+				// directMemorySize = jvmTotalNoNet * memoryFraction
+				long maxJvmHeap = EnvironmentInformation.getMaxJvmHeapMemory();
+				long directMemorySize = (long) (maxJvmHeap / (1.0 - memoryFraction) * memoryFraction);
 				if (preAllocateMemory) {
 					LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." ,
 						memoryFraction, directMemorySize >> 20);
@@ -312,8 +312,7 @@ public class TaskManagerServices {
 
 		NetworkBufferPool networkBufferPool = new NetworkBufferPool(
 			(int) numNetBuffersLong,
-			segmentSize,
-			networkEnvironmentConfiguration.memoryType());
+			segmentSize);
 
 		ConnectionManager connectionManager;
 
@@ -390,7 +389,7 @@ public class TaskManagerServices {
 	 * @param config
 	 * 		configuration object
 	 *
-	 * @return memory to use for network buffers (in bytes)
+	 * @return memory to use for network buffers (in bytes); at least one memory segment
 	 */
 	@SuppressWarnings("deprecation")
 	public static long calculateNetworkBufferMemory(long totalJavaMemorySize, Configuration config) {
@@ -419,6 +418,14 @@ public class TaskManagerServices {
 						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
 					"Network buffer memory size too large: " + networkBufBytes + " >= " +
 						totalJavaMemorySize + " (total JVM memory size)");
+			TaskManagerServicesConfiguration
+				.checkConfigParameter(networkBufBytes >= segmentSize,
+					"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
+					"(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
+						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
+					"Network buffer memory size too small: " + networkBufBytes + " < " +
+						segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")");
 		} else {
 			// use old (deprecated) network buffers parameter
 			int numNetworkBuffers = config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
@@ -431,6 +438,11 @@ public class TaskManagerServices {
 					networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
 					"Network buffer memory size too large: " + networkBufBytes + " >= " +
 						totalJavaMemorySize + " (total JVM memory size)");
+			TaskManagerServicesConfiguration
+				.checkConfigParameter(networkBufBytes >= segmentSize,
+					networkBufBytes, TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
+					"Network buffer memory size too small: " + networkBufBytes + " < " +
+						segmentSize + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")");
 		}
 
 		return networkBufBytes;
@@ -469,37 +481,24 @@ public class TaskManagerServices {
 			return networkBufMin;
 		}
 
-		// relative network buffer pool size using the fraction
+		// relative network buffer pool size using the fraction...
 
-		final MemoryType memType = networkConfig.memoryType();
+		// The maximum heap memory has been adjusted as in
+		// calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config))
+		// and we need to invert these calculations.
 
-		final long networkBufBytes;
-		if (memType == MemoryType.HEAP) {
-			// use fraction parts of the available heap memory
+		final MemoryType memType = tmConfig.getMemoryType();
 
-			final long relativeMemSize = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
-			networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin,
-				(long) (networkBufFraction * relativeMemSize)));
+		final long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
 
-			TaskManagerServicesConfiguration
-				.checkConfigParameter(networkBufBytes < relativeMemSize,
-					"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
-					"(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
-						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
-						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
-					"Network buffer memory size too large: " + networkBufBytes + " >= " +
-						relativeMemSize + "(free JVM heap size)");
+		final long jvmHeapNoNet;
+		if (memType == MemoryType.HEAP) {
+			jvmHeapNoNet = maxMemory;
 		} else if (memType == MemoryType.OFF_HEAP) {
-			// The maximum heap memory has been adjusted accordingly as in
-			// calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config))
-			// and we need to invert these calculations.
-
-			final long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory();
 
 			// check if a value has been configured
 			long configuredMemory = tmConfig.getConfiguredMemory() << 20; // megabytes to bytes
 
-			final long jvmHeapNoNet;
 			if (configuredMemory > 0) {
 				// The maximum heap memory has been adjusted according to configuredMemory, i.e.
 				// maxJvmHeap = jvmHeapNoNet - configuredMemory
@@ -512,25 +511,25 @@ public class TaskManagerServices {
 				final float managedFraction = tmConfig.getMemoryFraction();
 				jvmHeapNoNet = (long) (maxMemory / (1.0 - managedFraction));
 			}
-
-			// finally extract the network buffer memory size again from:
-			// jvmHeapNoNet = jvmHeap - networkBufBytes
-			//              = jvmHeap - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction)
-			networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin,
-				(long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction)));
-
-			TaskManagerServicesConfiguration
-				.checkConfigParameter(networkBufBytes < maxMemory,
-					"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
-					"(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
-						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
-						TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
-					"Network buffer memory size too large: " + networkBufBytes + " >= " +
-						maxMemory + "(maximum JVM heap size)");
 		} else {
 			throw new RuntimeException("No supported memory type detected.");
 		}
 
+		// finally extract the network buffer memory size again from:
+		// jvmHeapNoNet = jvmHeap - networkBufBytes
+		//              = jvmHeap - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction)
+		final long networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin,
+			(long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction)));
+
+		TaskManagerServicesConfiguration
+			.checkConfigParameter(networkBufBytes < maxMemory,
+				"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
+				"(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
+					TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
+					TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
+				"Network buffer memory size too large: " + networkBufBytes + " >= " +
+					maxMemory + "(maximum JVM heap size)");
+
 		return networkBufBytes;
 	}
 
@@ -548,7 +547,12 @@ public class TaskManagerServices {
 	public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
 		Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
 
-		final long totalJavaMemorySize = totalJavaMemorySizeMB << 20; // megabytes to bytes
+		// subtract the Java memory used for network buffers (always off-heap)
+		final long networkBufMB =
+			calculateNetworkBufferMemory(
+				totalJavaMemorySizeMB << 20, // megabytes to bytes
+				config) >> 20; // bytes to megabytes
+		final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;
 
 		// split the available Java memory between heap and off-heap
 
@@ -557,10 +561,6 @@ public class TaskManagerServices {
 		final long heapSizeMB;
 		if (useOffHeap) {
 
-			// subtract the Java memory used for network buffers
-			final long networkBufMB = calculateNetworkBufferMemory(totalJavaMemorySize, config) >> 20; // bytes to megabytes
-			final long remainingJavaMemorySizeMB = totalJavaMemorySizeMB - networkBufMB;
-
 			long offHeapSize = config.getLong(TaskManagerOptions.MANAGED_MEMORY_SIZE);
 
 			if (offHeapSize <= 0) {
@@ -578,7 +578,7 @@ public class TaskManagerServices {
 
 			heapSizeMB = remainingJavaMemorySizeMB - offHeapSize;
 		} else {
-			heapSizeMB = totalJavaMemorySizeMB;
+			heapSizeMB = remainingJavaMemorySizeMB;
 		}
 
 		return heapSizeMB;

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index bae683b..4f8641e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -67,6 +67,8 @@ public class TaskManagerServicesConfiguration {
 	 */
 	private final long configuredMemory;
 
+	private final MemoryType memoryType;
+
 	private final boolean preAllocateMemory;
 
 	private final float memoryFraction;
@@ -80,6 +82,7 @@ public class TaskManagerServicesConfiguration {
 			QueryableStateConfiguration queryableStateConfig,
 			int numberOfSlots,
 			long configuredMemory,
+			MemoryType memoryType,
 			boolean preAllocateMemory,
 			float memoryFraction,
 			long timerServiceShutdownTimeout) {
@@ -91,6 +94,7 @@ public class TaskManagerServicesConfiguration {
 		this.numberOfSlots = checkNotNull(numberOfSlots);
 
 		this.configuredMemory = configuredMemory;
+		this.memoryType = checkNotNull(memoryType);
 		this.preAllocateMemory = preAllocateMemory;
 		this.memoryFraction = memoryFraction;
 
@@ -128,6 +132,15 @@ public class TaskManagerServicesConfiguration {
 	}
 
 	/**
+	 * Returns the memory type to use.
+	 *
+	 * @return on-heap or off-heap memory
+	 */
+	public MemoryType getMemoryType() {
+		return memoryType;
+	}
+
+	/**
 	 * Returns the size of the managed memory (in megabytes), if configured.
 	 *
 	 * @return managed memory or a default value (currently <tt>-1</tt>) if not configured
@@ -194,6 +207,14 @@ public class TaskManagerServicesConfiguration {
 				"If you leave this config parameter empty, the system automatically " +
 				"pick a fraction of the available memory.");
 
+		// check whether we use heap or off-heap memory
+		final MemoryType memType;
+		if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
+			memType = MemoryType.OFF_HEAP;
+		} else {
+			memType = MemoryType.HEAP;
+		}
+
 		boolean preAllocateMemory = configuration.getBoolean(TaskManagerOptions.MANAGED_MEMORY_PRE_ALLOCATE);
 
 		float memoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
@@ -210,6 +231,7 @@ public class TaskManagerServicesConfiguration {
 			queryableStateConfig,
 			slots,
 			configuredMemory,
+			memType,
 			preAllocateMemory,
 			memoryFraction,
 			timerServiceShutdownTimeout);
@@ -258,14 +280,6 @@ public class TaskManagerServicesConfiguration {
 			TaskManagerOptions.MEMORY_SEGMENT_SIZE.key(),
 			"Memory segment size must be a power of 2.");
 
-		// check whether we use heap or off-heap memory
-		final MemoryType memType;
-		if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
-			memType = MemoryType.OFF_HEAP;
-		} else {
-			memType = MemoryType.HEAP;
-		}
-
 		// network buffer memory fraction
 
 		float networkBufFraction = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
@@ -324,7 +338,6 @@ public class TaskManagerServicesConfiguration {
 			networkBufMin,
 			networkBufMax,
 			pageSize,
-			memType,
 			ioMode,
 			initialRequestBackoff,
 			maxRequestBackoff,

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
index 193fd90..6c66c77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 
@@ -37,8 +36,6 @@ public class NetworkEnvironmentConfiguration {
 
 	private final int networkBufferSize;
 
-	private final MemoryType memoryType;
-
 	private final IOMode ioMode;
 
 	private final int partitionRequestInitialBackoff;
@@ -59,7 +56,6 @@ public class NetworkEnvironmentConfiguration {
 			long networkBufMin,
 			long networkBufMax,
 			int networkBufferSize,
-			MemoryType memoryType,
 			IOMode ioMode,
 			int partitionRequestInitialBackoff,
 			int partitionRequestMaxBackoff,
@@ -67,7 +63,7 @@ public class NetworkEnvironmentConfiguration {
 			int floatingNetworkBuffersPerGate) {
 
 		this(networkBufFraction, networkBufMin, networkBufMax, networkBufferSize,
-				memoryType, ioMode,
+				ioMode,
 				partitionRequestInitialBackoff, partitionRequestMaxBackoff,
 				networkBuffersPerChannel, floatingNetworkBuffersPerGate,
 				null);
@@ -79,7 +75,6 @@ public class NetworkEnvironmentConfiguration {
 			long networkBufMin,
 			long networkBufMax,
 			int networkBufferSize,
-			MemoryType memoryType,
 			IOMode ioMode,
 			int partitionRequestInitialBackoff,
 			int partitionRequestMaxBackoff,
@@ -91,7 +86,6 @@ public class NetworkEnvironmentConfiguration {
 		this.networkBufMin = networkBufMin;
 		this.networkBufMax = networkBufMax;
 		this.networkBufferSize = networkBufferSize;
-		this.memoryType = memoryType;
 		this.ioMode = ioMode;
 		this.partitionRequestInitialBackoff = partitionRequestInitialBackoff;
 		this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
@@ -118,10 +112,6 @@ public class NetworkEnvironmentConfiguration {
 		return networkBufferSize;
 	}
 
-	public MemoryType memoryType() {
-		return memoryType;
-	}
-
 	public IOMode ioMode() {
 		return ioMode;
 	}
@@ -152,7 +142,6 @@ public class NetworkEnvironmentConfiguration {
 	public int hashCode() {
 		int result = 1;
 		result = 31 * result + networkBufferSize;
-		result = 31 * result + memoryType.hashCode();
 		result = 31 * result + ioMode.hashCode();
 		result = 31 * result + partitionRequestInitialBackoff;
 		result = 31 * result + partitionRequestMaxBackoff;
@@ -181,7 +170,6 @@ public class NetworkEnvironmentConfiguration {
 					this.partitionRequestMaxBackoff == that.partitionRequestMaxBackoff &&
 					this.networkBuffersPerChannel == that.networkBuffersPerChannel &&
 					this.floatingNetworkBuffersPerGate == that.floatingNetworkBuffersPerGate &&
-					this.memoryType == that.memoryType &&
 					this.ioMode == that.ioMode && 
 					(nettyConfig != null ? nettyConfig.equals(that.nettyConfig) : that.nettyConfig == null);
 		}
@@ -194,7 +182,6 @@ public class NetworkEnvironmentConfiguration {
 				", networkBufMin=" + networkBufMin +
 				", networkBufMax=" + networkBufMax +
 				", networkBufferSize=" + networkBufferSize +
-				", memoryType=" + memoryType +
 				", ioMode=" + ioMode +
 				", partitionRequestInitialBackoff=" + partitionRequestInitialBackoff +
 				", partitionRequestMaxBackoff=" + partitionRequestMaxBackoff +

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
index ad11f70..8d9ea88 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.runtime.clusterframework;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import static org.apache.flink.configuration.TaskManagerOptions.MEMORY_OFF_HEAP;
+import static org.apache.flink.runtime.taskexecutor.TaskManagerServices.calculateNetworkBufferMemory;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -30,7 +32,7 @@ public class ContaineredTaskManagerParametersTest extends TestLogger {
 	private static final long CONTAINER_MEMORY = 8192L;
 
 	/**
-	 * This tests that per default the off heap memory is set to -1.
+	 * This tests that per default the off heap memory is set to what the network buffers require.
 	 */
 	@Test
 	public void testOffHeapMemoryWithDefaultConfiguration() {
@@ -38,15 +40,46 @@ public class ContaineredTaskManagerParametersTest extends TestLogger {
 
 		ContaineredTaskManagerParameters params =
 			ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1);
-		assertEquals(-1L, params.taskManagerDirectMemoryLimitMB());
+
+		final float memoryCutoffRatio = conf.getFloat(
+			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
+			ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF_RATIO);
+		final int minCutoff = conf.getInteger(
+			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_MIN,
+			ConfigConstants.DEFAULT_YARN_HEAP_CUTOFF);
+
+		long cutoff = Math.max((long) (CONTAINER_MEMORY * memoryCutoffRatio), minCutoff);
+		final long networkBufMB =
+			calculateNetworkBufferMemory(
+				(CONTAINER_MEMORY - cutoff) << 20, // megabytes to bytes
+				conf) >> 20; // bytes to megabytes
+		assertEquals(networkBufMB + cutoff, params.taskManagerDirectMemoryLimitMB());
+	}
+
+	/**
+	 * This tests that when using off-heap memory the sum of on and off heap memory does not exceed the container
+	 * maximum.
+	 */
+	@Test
+	public void testTotalMemoryDoesNotExceedContainerMemoryOnHeap() {
+		Configuration conf = new Configuration();
+		conf.setBoolean(MEMORY_OFF_HEAP, false);
+
+		ContaineredTaskManagerParameters params =
+			ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1);
+
+		assertTrue(params.taskManagerDirectMemoryLimitMB() > 0L);
+
+		assertTrue(params.taskManagerHeapSizeMB() +
+			params.taskManagerDirectMemoryLimitMB() <= CONTAINER_MEMORY);
 	}
 
 	/**
-	 * This tests that when using off heap memory the sum of on and off heap memory does not exceeds the container
+	 * This tests that when using on-heap memory the sum of on and off heap memory does not exceed the container
 	 * maximum.
 	 */
 	@Test
-	public void testTotalMemoryDoesNotExceedContainerMemory() {
+	public void testTotalMemoryDoesNotExceedContainerMemoryOffHeap() {
 		Configuration conf = new Configuration();
 		conf.setBoolean(MEMORY_OFF_HEAP, true);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index ef2d5c2..123082f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -33,6 +32,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -63,7 +63,7 @@ public class NetworkEnvironmentTest {
 	public void testRegisterTaskUsesBoundedBuffers() throws Exception {
 
 		final NetworkEnvironment network = new NetworkEnvironment(
-			new NetworkBufferPool(numBuffers, memorySegmentSize, MemoryType.HEAP),
+			new NetworkBufferPool(numBuffers, memorySegmentSize),
 			new LocalConnectionManager(),
 			new ResultPartitionManager(),
 			new TaskEventDispatcher(),

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index df5c616..9f699da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -42,6 +41,7 @@ import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.testutils.DiscardingRecycler;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.XORShiftRandom;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -176,7 +176,7 @@ public class RecordWriterTest {
 		BufferPool bufferPool = null;
 
 		try {
-			buffers = new NetworkBufferPool(1, 1024, MemoryType.HEAP);
+			buffers = new NetworkBufferPool(1, 1024);
 			bufferPool = spy(buffers.createBufferPool(1, Integer.MAX_VALUE));
 
 			ResultPartitionWriter partitionWriter = mock(ResultPartitionWriter.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
index 941aeae..d15aba6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemoryType;
 
 import org.junit.After;
 import org.junit.Before;
@@ -55,7 +54,7 @@ public class BufferPoolFactoryTest {
 
 	@Before
 	public void setupNetworkBufferPool() {
-		networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize, MemoryType.HEAP);
+		networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize);
 	}
 
 	@After
@@ -245,7 +244,7 @@ public class BufferPoolFactoryTest {
 
 	@Test
 	public void testUniformDistributionBounded3() throws IOException {
-		NetworkBufferPool globalPool = new NetworkBufferPool(3, 128, MemoryType.HEAP);
+		NetworkBufferPool globalPool = new NetworkBufferPool(3, 128);
 		try {
 			BufferPool first = globalPool.createBufferPool(0, 10);
 			assertEquals(3, first.getNumBuffers());
@@ -278,7 +277,7 @@ public class BufferPoolFactoryTest {
 	 */
 	@Test
 	public void testUniformDistributionBounded4() throws IOException {
-		NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP);
+		NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
 		try {
 			BufferPool first = globalPool.createBufferPool(0, 10);
 			assertEquals(10, first.getNumBuffers());

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
index 18e2136..6e02542 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.buffer;
 
-import org.apache.flink.core.memory.MemoryType;
 import org.junit.Test;
 
 import java.util.concurrent.atomic.AtomicReference;
@@ -46,7 +45,7 @@ public class LocalBufferPoolDestroyTest {
 		LocalBufferPool localBufferPool = null;
 
 		try {
-			networkBufferPool = new NetworkBufferPool(1, 4096, MemoryType.HEAP);
+			networkBufferPool = new NetworkBufferPool(1, 4096);
 			localBufferPool = new LocalBufferPool(networkBufferPool, 1);
 
 			// Drain buffer pool

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 7a309d7..4eb568a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network.buffer;
 
-import org.apache.flink.core.memory.MemoryType;
-
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
 import org.junit.After;
@@ -63,7 +61,7 @@ public class LocalBufferPoolTest {
 
 	@Before
 	public void setupLocalBufferPool() {
-		networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize, MemoryType.HEAP);
+		networkBufferPool = new NetworkBufferPool(numBuffers, memorySegmentSize);
 		localBufferPool = new LocalBufferPool(networkBufferPool, 1);
 
 		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
@@ -77,6 +75,9 @@ public class LocalBufferPoolTest {
 
 		String msg = "Did not return all buffers to memory segment pool after test.";
 		assertEquals(msg, numBuffers, networkBufferPool.getNumberOfAvailableMemorySegments());
+		// no other local buffer pools used than the one above, but call just in case
+		networkBufferPool.destroyAllBufferPools();
+		networkBufferPool.destroy();
 	}
 
 	@AfterClass
@@ -227,7 +228,7 @@ public class LocalBufferPoolTest {
 		// and the twoTimesListener will be added into the registeredListeners
 		// queue of buffer pool again
 		available1.recycle();
-		
+
 		verify(oneTimeListener, times(1)).notifyBufferAvailable(any(Buffer.class));
 		verify(twoTimesListener, times(1)).notifyBufferAvailable(any(Buffer.class));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index e30e955..4d7648a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -31,10 +31,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import static org.hamcrest.core.IsCollectionContaining.hasItem;
 import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
 import static org.hamcrest.core.IsNot.not;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -54,7 +53,7 @@ public class NetworkBufferPoolTest {
 			final int bufferSize = 128;
 			final int numBuffers = 10;
 
-			NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize, MemoryType.HEAP);
+			NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, bufferSize);
 			assertEquals(bufferSize, globalPool.getMemorySegmentSize());
 			assertEquals(numBuffers, globalPool.getTotalNumberOfMemorySegments());
 			assertEquals(numBuffers, globalPool.getNumberOfAvailableMemorySegments());
@@ -98,7 +97,7 @@ public class NetworkBufferPoolTest {
 	@Test
 	public void testDestroyAll() {
 		try {
-			NetworkBufferPool globalPool = new NetworkBufferPool(10, 128, MemoryType.HEAP);
+			NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
 
 			BufferPool fixedPool = globalPool.createBufferPool(2, 2);
 			BufferPool boundedPool = globalPool.createBufferPool(0, 1);
@@ -193,7 +192,7 @@ public class NetworkBufferPoolTest {
 	public void testRequestMemorySegmentsLessThanTotalBuffers() throws Exception {
 		final int numBuffers = 10;
 
-		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
 
 		List<MemorySegment> memorySegments = Collections.emptyList();
 		try {
@@ -217,7 +216,7 @@ public class NetworkBufferPoolTest {
 	public void testRequestMemorySegmentsMoreThanTotalBuffers() throws Exception {
 		final int numBuffers = 10;
 
-		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
 
 		try {
 			globalPool.requestMemorySegments(numBuffers + 1);
@@ -237,7 +236,7 @@ public class NetworkBufferPoolTest {
 	public void testRequestMemorySegmentsWithInvalidArgument() throws Exception {
 		final int numBuffers = 10;
 
-		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
 
 		try {
 			// the number of requested buffers should be larger than zero
@@ -258,7 +257,7 @@ public class NetworkBufferPoolTest {
 	public void testRequestMemorySegmentsWithBuffersTaken() throws IOException, InterruptedException {
 		final int numBuffers = 10;
 
-		NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+		NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128);
 
 		final List<Buffer> buffers = new ArrayList<>(numBuffers);
 		List<MemorySegment> memorySegments = Collections.emptyList();
@@ -314,7 +313,7 @@ public class NetworkBufferPoolTest {
 	public void testRequestMemorySegmentsInterruptable() throws Exception {
 		final int numBuffers = 10;
 
-		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128, MemoryType.HEAP);
+		NetworkBufferPool globalPool = new NetworkBufferPool(numBuffers, 128);
 		MemorySegment segment = globalPool.requestMemorySegment();
 		assertNotNull(segment);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index e685f17..5f7fd82 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -47,7 +46,6 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import scala.Tuple2;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -59,6 +57,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import scala.Tuple2;
+
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -95,7 +95,7 @@ public class LocalInputChannelTest {
 
 		final NetworkBufferPool networkBuffers = new NetworkBufferPool(
 			(parallelism * producerBufferPoolSize) + (parallelism * parallelism),
-			TestBufferFactory.BUFFER_SIZE, MemoryType.HEAP);
+			TestBufferFactory.BUFFER_SIZE);
 
 		final ResultPartitionConsumableNotifier partitionConsumableNotifier =
 			mock(ResultPartitionConsumableNotifier.class);
@@ -176,6 +176,7 @@ public class LocalInputChannelTest {
 			}
 		}
 		finally {
+			networkBuffers.destroyAllBufferPools();
 			networkBuffers.destroy();
 			executor.shutdown();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
index dc22752..6b0f251 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
@@ -76,12 +75,13 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 	@BeforeClass
 	public static void setup() {
 		testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
-		networkBufferPool = new NetworkBufferPool(100, 8192, MemoryType.HEAP);
+		networkBufferPool = new NetworkBufferPool(100, 8192);
 	}
 
 	@AfterClass
 	public static void teardown() {
 		JavaTestKit.shutdownActorSystem(testActorSystem);
+		networkBufferPool.destroyAllBufferPools();
 		networkBufferPool.destroy();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
index a8358a1..1ec280d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesTest.java
@@ -204,8 +204,16 @@ public class TaskManagerServicesTest {
 		tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
 			TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
 			0.1f, 60L << 20, 1L << 30, MemoryType.HEAP);
-		when(EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag()).thenReturn(1000L << 20); // 1000MB
-		assertEquals(100L << 20, TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
+		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(900L << 20); // 900MB
+		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
+			TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
+
+		tmConfig = getTmConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(),
+			TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
+			0.2f, 60L << 20, 1L << 30, MemoryType.HEAP);
+		when(EnvironmentInformation.getMaxJvmHeapMemory()).thenReturn(800L << 20); // 800MB
+		assertEquals((200L << 20) + 3 /* slightly too many due to floating point imprecision */,
+			TaskManagerServices.calculateNetworkBufferMemory(tmConfig));
 
 		tmConfig = getTmConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
 			0.1f, 60L << 20, 1L << 30, MemoryType.OFF_HEAP);
@@ -242,7 +250,6 @@ public class TaskManagerServicesTest {
 			networkBufMin,
 			networkBufMax,
 			TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue(),
-			memType,
 			null,
 			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
 			TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
@@ -257,6 +264,7 @@ public class TaskManagerServicesTest {
 			QueryableStateConfiguration.disabled(),
 			1,
 			managedMemory,
+			memType,
 			false,
 			managedMemoryFraction,
 			0);
@@ -274,9 +282,14 @@ public class TaskManagerServicesTest {
 		config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 1L << 30); // 1GB
 
 		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
-		assertEquals(1000, TaskManagerServices.calculateHeapSizeMB(1000, config));
+		assertEquals(900, TaskManagerServices.calculateHeapSizeMB(1000, config));
+
+		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, false);
+		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.2f);
+		assertEquals(800, TaskManagerServices.calculateHeapSizeMB(1000, config));
 
 		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
+		config.setFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
 		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 10); // 10MB
 		assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 4b62770..98b5b8b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -130,7 +130,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 			// note: the network buffer memory configured here is not actually used below but set
 			// accordingly to be consistent
 			final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
-					0.1f, networkBufNum * BUFFER_SIZE, networkBufNum * BUFFER_SIZE, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC,
+					0.1f, networkBufNum * BUFFER_SIZE, networkBufNum * BUFFER_SIZE, BUFFER_SIZE, IOManager.IOMode.SYNC,
 					0, 0, 2, 8, null);
 
 			ResourceID taskManagerId = ResourceID.generate();
@@ -140,7 +140,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 			final MemoryManager memManager = new MemoryManager(networkBufNum * BUFFER_SIZE, 1, BUFFER_SIZE, MemoryType.HEAP, false);
 			final IOManager ioManager = new IOManagerAsync(TMP_DIR);
 			final NetworkEnvironment network = new NetworkEnvironment(
-				new NetworkBufferPool(32, netConf.networkBufferSize(), netConf.memoryType()),
+				new NetworkBufferPool(32, netConf.networkBufferSize()),
 				new LocalConnectionManager(),
 				new ResultPartitionManager(),
 				new TaskEventDispatcher(),

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index c801d09..090e44a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -49,11 +48,15 @@ public class BarrierBufferMassiveRandomTest {
 	@Test
 	public void testWithTwoChannelsAndRandomBarriers() {
 		IOManager ioMan = null;
+		NetworkBufferPool networkBufferPool1 = null;
+		NetworkBufferPool networkBufferPool2 = null;
 		try {
 			ioMan = new IOManagerAsync();
 
-			BufferPool pool1 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, 100);
-			BufferPool pool2 = new NetworkBufferPool(100, PAGE_SIZE, MemoryType.HEAP).createBufferPool(100, 100);
+			networkBufferPool1 = new NetworkBufferPool(100, PAGE_SIZE);
+			networkBufferPool2 = new NetworkBufferPool(100, PAGE_SIZE);
+			BufferPool pool1 = networkBufferPool1.createBufferPool(100, 100);
+			BufferPool pool2 = networkBufferPool2.createBufferPool(100, 100);
 
 			RandomGeneratingInputGate myIG = new RandomGeneratingInputGate(
 					new BufferPool[] { pool1, pool2 },
@@ -76,6 +79,14 @@ public class BarrierBufferMassiveRandomTest {
 			if (ioMan != null) {
 				ioMan.shutdown();
 			}
+			if (networkBufferPool1 != null) {
+				networkBufferPool1.destroyAllBufferPools();
+				networkBufferPool1.destroy();
+			}
+			if (networkBufferPool2 != null) {
+				networkBufferPool2.destroyAllBufferPools();
+				networkBufferPool2.destroy();
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1854a3de/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 03c61e8..8716f8a 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -465,7 +465,9 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 				"-yt", flinkLibFolder.getAbsolutePath(),
 				"-yn", "1",
 				"-yjm", "768",
-				"-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly
+				// test if the cutoff is passed correctly (only useful when larger than the value
+				// of containerized.heap-cutoff-min (default: 600MB)
+				"-yD", "yarn.heap-cutoff-ratio=0.7",
 				"-yD", "yarn.tags=test-tag",
 				"-ytm", "1024",
 				"-ys", "2", // test requesting slots from YARN.
@@ -544,8 +546,8 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 			});
 			Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
 			content = FileUtils.readFileToString(jobmanagerLog);
-			// TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE)
-			String expected = "Starting TaskManagers with command: $JAVA_HOME/bin/java -Xms424m -Xmx424m";
+			// TM was started with 1024 but we cut off 70% (NOT THE DEFAULT VALUE)
+			String expected = "Starting TaskManagers with command: $JAVA_HOME/bin/java -Xms244m -Xmx244m -XX:MaxDirectMemorySize=780m";
 			Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '" + jobmanagerLog + "'",
 				content.contains(expected));
 			expected = " (2/2) (attempt #0) to ";