You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2019/07/30 10:09:42 UTC
[flink] 03/04: [hotfix][test] Converting fraction to double to
improve the precision
This is an automated email from the ASF dual-hosted git repository.
nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 351b303fe527b009447b69fd22f41ddaa079dac7
Author: Gao Yun <yu...@alibaba-inc.com>
AuthorDate: Thu Jul 11 20:33:37 2019 +0800
[hotfix][test] Converting fraction to double to improve the precision
---
.../taskmanager/NettyShuffleEnvironmentConfiguration.java | 7 ++++++-
.../runtime/taskexecutor/NetworkBufferCalculationTest.java | 12 ++++++------
2 files changed, 12 insertions(+), 7 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index 31c7ca2..8569334 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -260,7 +260,12 @@ public class NettyShuffleEnvironmentConfiguration {
// = jvmTotal - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction)
// jvmTotal = jvmHeapNoNet / (1.0 - networkBufFraction)
float networkBufFraction = config.getFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
- long networkBufSize = (long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction);
+
+ // Converts to double for higher precision. Converting via string achieve higher precision for those
+ // numbers can not be represented preciously by float, like 0.4f.
+ double heapNoNetFraction = 1.0 - Double.valueOf(Float.toString(networkBufFraction));
+ long totalJavaMemorySize = (long) (jvmHeapNoNet / heapNoNetFraction);
+ long networkBufSize = (long) (totalJavaMemorySize * networkBufFraction);
// Do not need to check the maximum allowed memory since the computed total memory should always
// be larger than the computed network buffer memory as long as the fraction is less than 1.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
index e3afb87..ca83609 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
@@ -49,35 +49,35 @@ public class NetworkBufferCalculationTest extends TestLogger {
Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
0.1f, networkBufMin, networkBufMax, MemoryType.HEAP);
- assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
+ assertEquals(100L << 20,
NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 900L << 20)); // 900MB
config = getConfig(
Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
0.2f, networkBufMin, networkBufMax, MemoryType.HEAP);
- assertEquals((200L << 20) + 3 /* slightly too many due to floating point imprecision */,
+ assertEquals(200L << 20,
NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 800L << 20)); // 800MB
config = getConfig(
Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),
TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
0.6f, networkBufMin, networkBufMax, MemoryType.HEAP);
- assertEquals((600L << 20),
+ assertEquals(600L << 20,
NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 400L << 20)); // 400MB
config = getConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
0.1f, networkBufMin, networkBufMax, MemoryType.OFF_HEAP);
- assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
+ assertEquals(100L << 20,
NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 890L << 20)); // 890MB
config = getConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
0.6f, networkBufMin, networkBufMax, MemoryType.OFF_HEAP);
- assertEquals((590L << 20),
+ assertEquals(615L << 20,
NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 400L << 20)); // 400MB
config = getConfig(0, 0.1f, 0.1f, networkBufMin, networkBufMax, MemoryType.OFF_HEAP);
- assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
+ assertEquals(100L << 20,
NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 810L << 20)); // 810MB
}