You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2019/07/30 10:09:42 UTC

[flink] 03/04: [hotfix][test] Converting fraction to double to improve the precision

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 351b303fe527b009447b69fd22f41ddaa079dac7
Author: Gao Yun <yu...@alibaba-inc.com>
AuthorDate: Thu Jul 11 20:33:37 2019 +0800

    [hotfix][test] Converting fraction to double to improve the precision
---
 .../taskmanager/NettyShuffleEnvironmentConfiguration.java    |  7 ++++++-
 .../runtime/taskexecutor/NetworkBufferCalculationTest.java   | 12 ++++++------
 2 files changed, 12 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index 31c7ca2..8569334 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -260,7 +260,12 @@ public class NettyShuffleEnvironmentConfiguration {
 		//              = jvmTotal - Math.min(networkBufMax, Math.max(networkBufMin, jvmHeap * netFraction)
 		// jvmTotal = jvmHeapNoNet / (1.0 - networkBufFraction)
 		float networkBufFraction = config.getFloat(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
-		long networkBufSize = (long) (jvmHeapNoNet / (1.0 - networkBufFraction) * networkBufFraction);
+
+		// Converts to double for higher precision. Converting via string achieve higher precision for those
+		// numbers can not be represented preciously by float, like 0.4f.
+		double heapNoNetFraction = 1.0 - Double.valueOf(Float.toString(networkBufFraction));
+		long totalJavaMemorySize = (long) (jvmHeapNoNet / heapNoNetFraction);
+		long networkBufSize = (long) (totalJavaMemorySize * networkBufFraction);
 
 		// Do not need to check the maximum allowed memory since the computed total memory should always
 		// be larger than the computed network buffer memory as long as the fraction is less than 1.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
index e3afb87..ca83609 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
@@ -49,35 +49,35 @@ public class NetworkBufferCalculationTest extends TestLogger {
 			Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),
 			TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
 			0.1f, networkBufMin, networkBufMax, MemoryType.HEAP);
-		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
+		assertEquals(100L << 20,
 			NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 900L << 20)); // 900MB
 
 		config = getConfig(
 			Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),
 			TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
 			0.2f, networkBufMin, networkBufMax, MemoryType.HEAP);
-		assertEquals((200L << 20) + 3 /* slightly too many due to floating point imprecision */,
+		assertEquals(200L << 20,
 			NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 800L << 20)); // 800MB
 
 		config = getConfig(
 			Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),
 			TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
 			0.6f, networkBufMin, networkBufMax, MemoryType.HEAP);
-		assertEquals((600L << 20),
+		assertEquals(600L << 20,
 			NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 400L << 20)); // 400MB
 
 		config = getConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
 			0.1f, networkBufMin, networkBufMax, MemoryType.OFF_HEAP);
-		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
+		assertEquals(100L << 20,
 			NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 890L << 20)); // 890MB
 
 		config = getConfig(10, TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
 				0.6f, networkBufMin, networkBufMax, MemoryType.OFF_HEAP);
-		assertEquals((590L << 20),
+		assertEquals(615L << 20,
 			NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 400L << 20)); // 400MB
 
 		config = getConfig(0, 0.1f, 0.1f, networkBufMin, networkBufMax, MemoryType.OFF_HEAP);
-		assertEquals((100L << 20) + 1 /* one too many due to floating point imprecision */,
+		assertEquals(100L << 20,
 			NettyShuffleEnvironmentConfiguration.calculateNewNetworkBufferMemory(config, 810L << 20)); // 810MB
 	}