You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2019/07/30 10:09:41 UTC

[flink] 02/04: [FLINK-12171][Network] Do not limit the network buffer memory by heap size on the TM side

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e457cd46fdb285813f08242d9df5fbe4c81f0b2b
Author: Gao Yun <yu...@alibaba-inc.com>
AuthorDate: Thu Jul 11 20:00:11 2019 +0800

    [FLINK-12171][Network] Do not limit the network buffer memory by heap size on the TM side
---
 .../TaskManagerHeapSizeCalculationJavaBashTest.java   | 12 ++++++++++++
 .../NettyShuffleEnvironmentConfiguration.java         | 19 +++++++++++--------
 .../NettyShuffleEnvironmentConfigurationTest.java     |  7 +++++++
 .../taskexecutor/NetworkBufferCalculationTest.java    | 12 ++++++++++++
 4 files changed, 42 insertions(+), 8 deletions(-)

diff --git a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
index 44277c5..b965355 100755
--- a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
+++ b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
@@ -94,6 +94,12 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
 		compareNetworkBufJavaVsScript(
 			getConfig(totalMemoryInMB, true, 0.1f, networkBufMin, networkBufMax, managedMemSize, 0.1f), 0.0f);
 
+		compareNetworkBufJavaVsScript(
+			getConfig(totalMemoryInMB, false, 0.6f, networkBufMin, networkBufMax, managedMemSize, managedMemFrac), 0.0f);
+
+		compareNetworkBufJavaVsScript(
+			getConfig(totalMemoryInMB, true, 0.6f, networkBufMin, networkBufMax, 10 /*MB*/, managedMemFrac), 0.0f);
+
 		// some automated tests with random (but valid) values
 
 		Random ran = new Random();
@@ -127,6 +133,12 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
 		compareHeapSizeJavaVsScript(
 			getConfig(totalMemoryInMB, true, 0.1f, networkBufMin, networkBufMax, managedMemSize, 0.1f), 0.0f);
 
+		compareHeapSizeJavaVsScript(
+			getConfig(totalMemoryInMB, false, 0.6f, networkBufMin, networkBufMax, managedMemSize, managedMemFrac), 0.0f);
+
+		compareHeapSizeJavaVsScript(
+			getConfig(totalMemoryInMB, true, 0.6f, networkBufMin, networkBufMax, 10 /*MB*/, managedMemFrac), 0.0f);
+
 		// some automated tests with random (but valid) values
 
 		Random ran = new Random();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index e73cc6a..31c7ca2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -256,12 +256,15 @@ public class NettyShuffleEnvironmentConfiguration {
 		}
 
 		// finally extract the network buffer memory size again from:
-		// jvmHeapNoNet = jvmHeap - networkBufBytes
-		//              = jvmHeap - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction)
-		// jvmHeap = jvmHeapNoNet / (1.0 - networkBufFraction)
+		// jvmHeapNoNet = jvmTotal - networkBufBytes
+		//              = jvmTotal - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction)
+		// jvmTotal = jvmHeapNoNet / (1.0 - networkBufFraction)
 		float networkBufFraction = config.getFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
 		long networkBufSize = (long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction);
-		return calculateNewNetworkBufferMemory(config, networkBufSize, maxJvmHeapMemory);
+
+		// Do not need to check the maximum allowed memory since the computed total memory should always
+		// be larger than the computed network buffer memory as long as the fraction is less than 1.
+		return calculateNewNetworkBufferMemory(config, networkBufSize, Long.MAX_VALUE);
 	}
 
 	/**
@@ -319,11 +322,11 @@ public class NettyShuffleEnvironmentConfiguration {
 	 *
 	 * @param config configuration object
 	 * @param networkBufSize memory of network buffers based on JVM memory size and network fraction
-	 * @param maxJvmHeapMemory maximum memory used for checking the results of network memory
+	 * @param maxAllowedMemory maximum memory used for checking the results of network memory
 	 *
 	 * @return memory to use for network buffers (in bytes)
 	 */
-	private static long calculateNewNetworkBufferMemory(Configuration config, long networkBufSize, long maxJvmHeapMemory) {
+	private static long calculateNewNetworkBufferMemory(Configuration config, long networkBufSize, long maxAllowedMemory) {
 		float networkBufFraction = config.getFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
 		long networkBufMin = MemorySize.parse(config.getString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
 		long networkBufMax = MemorySize.parse(config.getString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
@@ -334,13 +337,13 @@ public class NettyShuffleEnvironmentConfiguration {
 
 		long networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin, networkBufSize));
 
-		ConfigurationParserUtils.checkConfigParameter(networkBufBytes < maxJvmHeapMemory,
+		ConfigurationParserUtils.checkConfigParameter(networkBufBytes < maxAllowedMemory,
 			"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
 			"(" + NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
 				NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
 				NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
 			"Network buffer memory size too large: " + networkBufBytes + " >= " +
-				maxJvmHeapMemory + " (maximum JVM memory size)");
+				maxAllowedMemory + " (maximum JVM memory size)");
 
 		return networkBufBytes;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
index 84a08c1..aee4689 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
@@ -242,12 +242,19 @@ public class NettyShuffleEnvironmentConfigurationTest extends TestLogger {
 		config.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.2f);
 		assertEquals(800, TaskManagerServices.calculateHeapSizeMB(1000, config));
 
+		config.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.6f);
+		assertEquals(400, TaskManagerServices.calculateHeapSizeMB(1000, config));
+
 		config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
 		config.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
 		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "10m"); // 10MB
 		assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config));
 
+		config.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.6f);
+		assertEquals(390, TaskManagerServices.calculateHeapSizeMB(1000, config));
+
 		config.removeConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE); // use fraction of given memory
+		config.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
 		config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.1f); // 10%
 		assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, config));
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
index 3ad9a2e..e3afb87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
@@ -59,11 +59,23 @@ public class NetworkBufferCalculationTest extends TestLogger {
 		assertEquals((200L << 20) + 3 /* slightly too many due to floating point imprecision */,
 			NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 800L << 20)); // 800MB
 
+		config = getConfig(
+			Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),
+			TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
+			0.6f, networkBufMin, networkBufMax, MemoryType.HEAP);
+		assertEquals((600L << 20),
+			NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 400L << 20)); // 400MB
+
 		config = getConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
 			0.1f, networkBufMin, networkBufMax, MemoryType.OFF_HEAP);
 		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
 			NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 890L << 20)); // 890MB
 
+		config = getConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
+				0.6f, networkBufMin, networkBufMax, MemoryType.OFF_HEAP);
+		assertEquals((590L << 20),
+			NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 400L << 20)); // 400MB
+
 		config = getConfig(0, 0.1f, 0.1f, networkBufMin, networkBufMax, MemoryType.OFF_HEAP);
 		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
 			NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 810L << 20)); // 810MB