You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2019/11/07 21:56:11 UTC

[flink] 20/21: [FLINK-14631] Account for netty direct allocations in direct memory limit (Netty Shuffle)

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b8c6a4cdfab29447dc1cfc74d37c1a4c34217717
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Wed Nov 6 17:24:28 2019 +0100

    [FLINK-14631] Account for netty direct allocations in direct memory limit (Netty Shuffle)
    
    At the moment after FLINK-13982, when we calculate JVM direct memory limit, we only account for memory segment network buffers but not for direct allocations from netty arenas in org.apache.flink.runtime.io.network.netty.NettyBufferPool. We should include netty arenas into shuffle memory calculations.
---
 .../test-scripts/test_batch_allround.sh            |  4 +--
 .../TaskExecutorResourceUtils.java                 |  6 +++-
 .../runtime/io/network/netty/NettyBufferPool.java  |  2 ++
 .../runtime/io/network/netty/NettyConfig.java      | 10 ++++--
 .../NettyShuffleEnvironmentConfiguration.java      | 39 ++++++++++++++++++----
 .../TaskExecutorResourceUtilsTest.java             |  6 +++-
 .../NettyShuffleEnvironmentConfigurationTest.java  | 16 +++++++++
 .../example/failing/JobSubmissionFailsITCase.java  |  6 +++-
 .../test/streaming/runtime/BackPressureITCase.java |  4 ++-
 .../apache/flink/yarn/YarnConfigurationITCase.java |  4 +--
 10 files changed, 79 insertions(+), 18 deletions(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
index 5143ef4..dc6f753 100755
--- a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
+++ b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
@@ -26,8 +26,8 @@ TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAll
 echo "Run DataSet-Allround-Test Program"
 
 # modify configuration to include spilling to disk
-set_config_key "taskmanager.network.memory.min" "10485760"
-set_config_key "taskmanager.network.memory.max" "10485760"
+set_config_key "taskmanager.network.memory.min" "27262976"
+set_config_key "taskmanager.network.memory.max" "27262976"
 
 set_conf_ssl "server"
 start_cluster
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
index 9c69b62..f98670b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
@@ -23,6 +23,8 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
@@ -375,7 +377,9 @@ public class TaskExecutorResourceUtils {
 		@SuppressWarnings("deprecation")
 		final long numOfBuffers = config.getInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS);
 		final long pageSize =  ConfigurationParserUtils.getPageSize(config);
-		return new MemorySize(numOfBuffers * pageSize);
+		final int numberOfSlots = config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
+		final long numberOfArenas = NettyConfig.getNumberOfArenas(config, numberOfSlots);
+		return new MemorySize(numOfBuffers * pageSize + numberOfArenas * NettyBufferPool.ARENA_SIZE);
 	}
 
 	private static RangeFraction getShuffleMemoryRangeFraction(final Configuration config) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
index 6d2a6c8..fc49712 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
@@ -67,6 +67,8 @@ public class NettyBufferPool extends PooledByteBufAllocator {
 	 */
 	private static final int MAX_ORDER = 11;
 
+	public static final long ARENA_SIZE = PAGE_SIZE << MAX_ORDER;
+
 	/**
 	 * Creates Netty's buffer pool with the specified number of direct arenas.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 9730907..43708f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -102,9 +102,7 @@ public class NettyConfig {
 	}
 
 	public int getNumberOfArenas() {
-		// default: number of slots
-		final int configValue = config.getInteger(NettyShuffleEnvironmentOptions.NUM_ARENAS);
-		return configValue == -1 ? numberOfSlots : configValue;
+		return getNumberOfArenas(config, numberOfSlots);
 	}
 
 	public int getServerNumThreads() {
@@ -188,4 +186,10 @@ public class NettyConfig {
 				getClientConnectTimeoutSeconds(), getSendAndReceiveBufferSize(),
 				getSendAndReceiveBufferSize() == 0 ? def : man);
 	}
+
+	public static int getNumberOfArenas(Configuration config, int numberOfSlots) {
+		// default: number of slots
+		final int configValue = config.getInteger(NettyShuffleEnvironmentOptions.NUM_ARENAS);
+		return configValue == -1 ? numberOfSlots : configValue;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index 7bb8a8e..a8d3926 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
@@ -184,10 +185,16 @@ public class NettyShuffleEnvironmentConfiguration {
 
 		final int pageSize = ConfigurationParserUtils.getPageSize(configuration);
 
-		final int numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory, shuffleMemorySize, pageSize);
-
 		final NettyConfig nettyConfig = createNettyConfig(configuration, localTaskManagerCommunication, taskManagerAddress, dataport);
 
+		final int numberOfNettyArenas = nettyConfig != null ? nettyConfig.getNumberOfArenas() : 0;
+		final int numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(
+			configuration,
+			maxJvmHeapMemory,
+			shuffleMemorySize,
+			pageSize,
+			numberOfNettyArenas);
+
 		int initialRequestBackoff = configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
 		int maxRequestBackoff = configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX);
 
@@ -463,11 +470,12 @@ public class NettyShuffleEnvironmentConfiguration {
 		long maxJvmHeapMemory,
 		@Nullable // should only be null when flip49 is disabled
 		MemorySize shuffleMemorySize,
-		int pageSize) {
+		int pageSize,
+		int numberOfNettyArenas) {
 
 		final int numberOfNetworkBuffers;
 		if (shuffleMemorySize != null) { // flip49 enbaled
-			numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(shuffleMemorySize.getBytes(), pageSize);
+			numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(shuffleMemorySize.getBytes(), pageSize, numberOfNettyArenas);
 			logIfIgnoringOldConfigs(configuration);
 		} else if (!hasNewNetworkConfig(configuration)) {
 			// fallback: number of network buffers
@@ -478,7 +486,7 @@ public class NettyShuffleEnvironmentConfiguration {
 			logIfIgnoringOldConfigs(configuration);
 
 			final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
-			numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(networkMemorySize, pageSize);
+			numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(networkMemorySize, pageSize, 0);
 		}
 
 		return numberOfNetworkBuffers;
@@ -491,9 +499,26 @@ public class NettyShuffleEnvironmentConfiguration {
 		}
 	}
 
-	private static int calculateNumberOfNetworkBuffers(long networkMemorySizeByte, int pageSizeByte) {
+	private static int calculateNumberOfNetworkBuffers(
+		long networkMemorySizeByte,
+		int pageSizeByte,
+		int numberOfNettyArenas) {
+
+		long nettyArenasSizeBytes = numberOfNettyArenas * NettyBufferPool.ARENA_SIZE;
+		Preconditions.checkArgument(
+			networkMemorySizeByte >= nettyArenasSizeBytes,
+			String.format(
+				"Provided shuffle memory %d bytes is not enough for direct allocation of %d netty arenas " +
+					"('taskmanager.network.netty.num-arenas', by default 'taskmanager.numberOfTaskSlots'). " +
+					"Each arena size is %d bytes. Total configured arenas size is %d bytes. " +
+					"Try to increase shuffle memory size by adjusting 'taskmanager.memory.shuffle.*' Flink configuration options.",
+				networkMemorySizeByte,
+				numberOfNettyArenas,
+				NettyBufferPool.ARENA_SIZE,
+				nettyArenasSizeBytes));
+
 		// tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory)
-		long numberOfNetworkBuffersLong = networkMemorySizeByte / pageSizeByte;
+		long numberOfNetworkBuffersLong = (networkMemorySizeByte - nettyArenasSizeBytes) / pageSizeByte;
 		if (numberOfNetworkBuffersLong > Integer.MAX_VALUE) {
 			throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySizeByte
 				+ ") corresponds to more than MAX_INT pages.");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
index 79ea059..26528a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -201,13 +202,16 @@ public class TaskExecutorResourceUtilsTest extends TestLogger {
 	public void testConfigShuffleMemoryLegacyNumOfBuffers() {
 		final MemorySize pageSize = MemorySize.parse("32k");
 		final int numOfBuffers = 1024;
-		final MemorySize shuffleSize = pageSize.multiply(numOfBuffers);
+		final MemorySize arenaSize = MemorySize.parse(NettyBufferPool.ARENA_SIZE + "b");
+		final int numberOfNettyArenas = 10;
+		final MemorySize shuffleSize = pageSize.multiply(numOfBuffers).add(arenaSize.multiply(numberOfNettyArenas));
 
 		@SuppressWarnings("deprecation")
 		final ConfigOption<Integer> legacyOption = NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS;
 
 		Configuration conf = new Configuration();
 		conf.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, pageSize.getKibiBytes() + "k");
+		conf.setInteger(NettyShuffleEnvironmentOptions.NUM_ARENAS, numberOfNettyArenas);
 		conf.setInteger(legacyOption, numOfBuffers);
 
 		// validate in configurations without explicit total flink/process memory, otherwise explicit configured
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
index c4c4c0b..fc0035c 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
@@ -30,8 +30,10 @@ import org.junit.Test;
 import java.net.InetAddress;
 import java.util.Random;
 
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -42,6 +44,20 @@ public class NettyShuffleEnvironmentConfigurationTest extends TestLogger {
 
 	private static final long MEM_SIZE_PARAM = 128L * 1024 * 1024;
 
+	@Test
+	public void testNetworkBufferNumberCalculation() {
+		final Configuration config = new Configuration();
+		config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "1m");
+		config.setInteger(NettyShuffleEnvironmentOptions.NUM_ARENAS, 4); // 4 x 16Mb = 64Mb
+		final int numNetworkBuffers = NettyShuffleEnvironmentConfiguration.fromConfiguration(
+			config,
+			MEM_SIZE_PARAM,
+			null,
+			true,
+			InetAddress.getLoopbackAddress()).numNetworkBuffers();
+		assertThat(numNetworkBuffers, is(64)); // 128Mb (total) - 64Mb (arenas) / 1Mb (page) = 64
+	}
+
 	/**
 	 * Verifies that {@link  NettyShuffleEnvironmentConfiguration#fromConfiguration(Configuration, long, MemorySize, boolean, InetAddress)}
 	 * returns the correct result for new configurations via
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
index 22d291d..36050e8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -68,7 +68,11 @@ public class JobSubmissionFailsITCase extends TestLogger {
 
 	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();
-		config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "4m");
+		config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+
+		// to accommodate for 10 netty arenas (NUM_SLOTS / NUM_TM) x 16Mb (NettyBufferPool.ARENA_SIZE)
+		config.setString(TaskManagerOptions.SHUFFLE_MEMORY_MIN, "256m");
+
 		return config;
 	}
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
index e7c2072..970a74a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.minicluster.TestingMiniCluster;
@@ -94,7 +95,8 @@ public class BackPressureITCase extends TestLogger {
 		final Configuration configuration = new Configuration();
 
 		final int memorySegmentSizeKb = 32;
-		final String networkBuffersMemory = (memorySegmentSizeKb * NUM_TASKS) + "kb";
+		final long nettyArenaSizeKb = NettyBufferPool.ARENA_SIZE >> 10;
+		final String networkBuffersMemory = ((memorySegmentSizeKb + nettyArenaSizeKb) * NUM_TASKS) + "kb";
 
 		configuration.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, memorySegmentSizeKb + "kb");
 		configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, networkBuffersMemory);
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 36f90e4..35ab654 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -91,8 +91,8 @@ public class YarnConfigurationITCase extends YarnTestBase {
 
 			// disable heap cutoff min
 			configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);
-			configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(1L << 20));
-			configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(4L << 20));
+			configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(49L << 20));
+			configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(52L << 20));
 
 			final YarnConfiguration yarnConfiguration = getYarnConfiguration();
 			final YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(