You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2019/07/30 10:09:41 UTC
[flink] 02/04: [FLINK-12171][Network] Do not limit the network
buffer memory by heap size on the TM side
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e457cd46fdb285813f08242d9df5fbe4c81f0b2b
Author: Gao Yun <yu...@alibaba-inc.com>
AuthorDate: Thu Jul 11 20:00:11 2019 +0800
[FLINK-12171][Network] Do not limit the network buffer memory by heap size on the TM side
---
.../TaskManagerHeapSizeCalculationJavaBashTest.java | 12 ++++++++++++
.../NettyShuffleEnvironmentConfiguration.java | 19 +++++++++++--------
.../NettyShuffleEnvironmentConfigurationTest.java | 7 +++++++
.../taskexecutor/NetworkBufferCalculationTest.java | 12 ++++++++++++
4 files changed, 42 insertions(+), 8 deletions(-)
diff --git a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
index 44277c5..b965355 100755
--- a/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
+++ b/flink-dist/src/test/java/org/apache/flink/dist/TaskManagerHeapSizeCalculationJavaBashTest.java
@@ -94,6 +94,12 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
compareNetworkBufJavaVsScript(
getConfig(totalMemoryInMB, true, 0.1f, networkBufMin, networkBufMax, managedMemSize, 0.1f), 0.0f);
+ compareNetworkBufJavaVsScript(
+ getConfig(totalMemoryInMB, false, 0.6f, networkBufMin, networkBufMax, managedMemSize, managedMemFrac), 0.0f);
+
+ compareNetworkBufJavaVsScript(
+ getConfig(totalMemoryInMB, true, 0.6f, networkBufMin, networkBufMax, 10 /*MB*/, managedMemFrac), 0.0f);
+
// some automated tests with random (but valid) values
Random ran = new Random();
@@ -127,6 +133,12 @@ public class TaskManagerHeapSizeCalculationJavaBashTest extends TestLogger {
compareHeapSizeJavaVsScript(
getConfig(totalMemoryInMB, true, 0.1f, networkBufMin, networkBufMax, managedMemSize, 0.1f), 0.0f);
+ compareHeapSizeJavaVsScript(
+ getConfig(totalMemoryInMB, false, 0.6f, networkBufMin, networkBufMax, managedMemSize, managedMemFrac), 0.0f);
+
+ compareHeapSizeJavaVsScript(
+ getConfig(totalMemoryInMB, true, 0.6f, networkBufMin, networkBufMax, 10 /*MB*/, managedMemFrac), 0.0f);
+
// some automated tests with random (but valid) values
Random ran = new Random();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index e73cc6a..31c7ca2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -256,12 +256,15 @@ public class NettyShuffleEnvironmentConfiguration {
}
// finally extract the network buffer memory size again from:
- // jvmHeapNoNet = jvmHeap - networkBufBytes
- // = jvmHeap - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction)
- // jvmHeap = jvmHeapNoNet / (1.0 - networkBufFraction)
+ // jvmHeapNoNet = jvmTotal - networkBufBytes
+ // = jvmTotal - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction)
+ // jvmTotal = jvmHeapNoNet / (1.0 - networkBufFraction)
float networkBufFraction = config.getFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
long networkBufSize = (long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction);
- return calculateNewNetworkBufferMemory(config, networkBufSize, maxJvmHeapMemory);
+
+ // Do not need to check the maximum allowed memory since the computed total memory should always
+ // be larger than the computed network buffer memory as long as the fraction is less than 1.
+ return calculateNewNetworkBufferMemory(config, networkBufSize, Long.MAX_VALUE);
}
/**
@@ -319,11 +322,11 @@ public class NettyShuffleEnvironmentConfiguration {
*
* @param config configuration object
* @param networkBufSize memory of network buffers based on JVM memory size and network fraction
- * @param maxJvmHeapMemory maximum memory used for checking the results of network memory
+ * @param maxAllowedMemory maximum memory used for checking the results of network memory
*
* @return memory to use for network buffers (in bytes)
*/
- private static long calculateNewNetworkBufferMemory(Configuration config, long networkBufSize, long maxJvmHeapMemory) {
+ private static long calculateNewNetworkBufferMemory(Configuration config, long networkBufSize, long maxAllowedMemory) {
float networkBufFraction = config.getFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
long networkBufMin = MemorySize.parse(config.getString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
long networkBufMax = MemorySize.parse(config.getString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
@@ -334,13 +337,13 @@ public class NettyShuffleEnvironmentConfiguration {
long networkBufBytes = Math.min(networkBufMax, Math.max(networkBufMin, networkBufSize));
- ConfigurationParserUtils.checkConfigParameter(networkBufBytes < maxJvmHeapMemory,
+ ConfigurationParserUtils.checkConfigParameter(networkBufBytes < maxAllowedMemory,
"(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")",
"(" + NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
"Network buffer memory size too large: " + networkBufBytes + " >= " +
- maxJvmHeapMemory + " (maximum JVM memory size)");
+ maxAllowedMemory + " (maximum JVM memory size)");
return networkBufBytes;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
index 84a08c1..aee4689 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
@@ -242,12 +242,19 @@ public class NettyShuffleEnvironmentConfigurationTest extends TestLogger {
config.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.2f);
assertEquals(800, TaskManagerServices.calculateHeapSizeMB(1000, config));
+ config.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.6f);
+ assertEquals(400, TaskManagerServices.calculateHeapSizeMB(1000, config));
+
config.setBoolean(TaskManagerOptions.MEMORY_OFF_HEAP, true);
config.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "10m"); // 10MB
assertEquals(890, TaskManagerServices.calculateHeapSizeMB(1000, config));
+ config.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.6f);
+ assertEquals(390, TaskManagerServices.calculateHeapSizeMB(1000, config));
+
config.removeConfig(TaskManagerOptions.MANAGED_MEMORY_SIZE); // use fraction of given memory
+ config.setFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION, 0.1f);
config.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.1f); // 10%
assertEquals(810, TaskManagerServices.calculateHeapSizeMB(1000, config));
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
index 3ad9a2e..e3afb87 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
@@ -59,11 +59,23 @@ public class NetworkBufferCalculationTest extends TestLogger {
assertEquals((200L << 20) + 3 /* slightly too many due to floating point imprecision */,
NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 800L << 20)); // 800MB
+ config = getConfig(
+ Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),
+ TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
+ 0.6f, networkBufMin, networkBufMax, MemoryType.HEAP);
+ assertEquals((600L << 20),
+ NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 400L << 20)); // 400MB
+
config = getConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
0.1f, networkBufMin, networkBufMax, MemoryType.OFF_HEAP);
assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 890L << 20)); // 890MB
+ config = getConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
+ 0.6f, networkBufMin, networkBufMax, MemoryType.OFF_HEAP);
+ assertEquals((590L << 20),
+ NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 400L << 20)); // 400MB
+
config = getConfig(0, 0.1f, 0.1f, networkBufMin, networkBufMax, MemoryType.OFF_HEAP);
assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 810L << 20)); // 810MB