You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2019/11/07 21:56:11 UTC
[flink] 20/21: [FLINK-14631] Account for netty direct allocations
in direct memory limit (Netty Shuffle)
This is an automated email from the ASF dual-hosted git repository.
azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b8c6a4cdfab29447dc1cfc74d37c1a4c34217717
Author: Andrey Zagrebin <az...@apache.org>
AuthorDate: Wed Nov 6 17:24:28 2019 +0100
[FLINK-14631] Account for netty direct allocations in direct memory limit (Netty Shuffle)
At the moment after FLINK-13982, when we calculate JVM direct memory limit, we only account for memory segment network buffers but not for direct allocations from netty arenas in org.apache.flink.runtime.io.network.netty.NettyBufferPool. We should include netty arenas into shuffle memory calculations.
---
.../test-scripts/test_batch_allround.sh | 4 +--
.../TaskExecutorResourceUtils.java | 6 +++-
.../runtime/io/network/netty/NettyBufferPool.java | 2 ++
.../runtime/io/network/netty/NettyConfig.java | 10 ++++--
.../NettyShuffleEnvironmentConfiguration.java | 39 ++++++++++++++++++----
.../TaskExecutorResourceUtilsTest.java | 6 +++-
.../NettyShuffleEnvironmentConfigurationTest.java | 16 +++++++++
.../example/failing/JobSubmissionFailsITCase.java | 6 +++-
.../test/streaming/runtime/BackPressureITCase.java | 4 ++-
.../apache/flink/yarn/YarnConfigurationITCase.java | 4 +--
10 files changed, 79 insertions(+), 18 deletions(-)
diff --git a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
index 5143ef4..dc6f753 100755
--- a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
+++ b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
@@ -26,8 +26,8 @@ TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAll
echo "Run DataSet-Allround-Test Program"
# modify configuration to include spilling to disk
-set_config_key "taskmanager.network.memory.min" "10485760"
-set_config_key "taskmanager.network.memory.max" "10485760"
+set_config_key "taskmanager.network.memory.min" "27262976"
+set_config_key "taskmanager.network.memory.max" "27262976"
set_conf_ssl "server"
start_cluster
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
index 9c69b62..f98670b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtils.java
@@ -23,6 +23,8 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -375,7 +377,9 @@ public class TaskExecutorResourceUtils {
@SuppressWarnings("deprecation")
final long numOfBuffers = config.getInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS);
final long pageSize = ConfigurationParserUtils.getPageSize(config);
- return new MemorySize(numOfBuffers * pageSize);
+ final int numberOfSlots = config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
+ final long numberOfArenas = NettyConfig.getNumberOfArenas(config, numberOfSlots);
+ return new MemorySize(numOfBuffers * pageSize + numberOfArenas * NettyBufferPool.ARENA_SIZE);
}
private static RangeFraction getShuffleMemoryRangeFraction(final Configuration config) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
index 6d2a6c8..fc49712 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
@@ -67,6 +67,8 @@ public class NettyBufferPool extends PooledByteBufAllocator {
*/
private static final int MAX_ORDER = 11;
+ public static final long ARENA_SIZE = PAGE_SIZE << MAX_ORDER;
+
/**
* Creates Netty's buffer pool with the specified number of direct arenas.
*
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 9730907..43708f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -102,9 +102,7 @@ public class NettyConfig {
}
public int getNumberOfArenas() {
- // default: number of slots
- final int configValue = config.getInteger(NettyShuffleEnvironmentOptions.NUM_ARENAS);
- return configValue == -1 ? numberOfSlots : configValue;
+ return getNumberOfArenas(config, numberOfSlots);
}
public int getServerNumThreads() {
@@ -188,4 +186,10 @@ public class NettyConfig {
getClientConnectTimeoutSeconds(), getSendAndReceiveBufferSize(),
getSendAndReceiveBufferSize() == 0 ? def : man);
}
+
+ public static int getNumberOfArenas(Configuration config, int numberOfSlots) {
+ // default: number of slots
+ final int configValue = config.getInteger(NettyShuffleEnvironmentOptions.NUM_ARENAS);
+ return configValue == -1 ? numberOfSlots : configValue;
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index 7bb8a8e..a8d3926 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
@@ -184,10 +185,16 @@ public class NettyShuffleEnvironmentConfiguration {
final int pageSize = ConfigurationParserUtils.getPageSize(configuration);
- final int numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory, shuffleMemorySize, pageSize);
-
final NettyConfig nettyConfig = createNettyConfig(configuration, localTaskManagerCommunication, taskManagerAddress, dataport);
+ final int numberOfNettyArenas = nettyConfig != null ? nettyConfig.getNumberOfArenas() : 0;
+ final int numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(
+ configuration,
+ maxJvmHeapMemory,
+ shuffleMemorySize,
+ pageSize,
+ numberOfNettyArenas);
+
int initialRequestBackoff = configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
int maxRequestBackoff = configuration.getInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX);
@@ -463,11 +470,12 @@ public class NettyShuffleEnvironmentConfiguration {
long maxJvmHeapMemory,
@Nullable // should only be null when flip49 is disabled
MemorySize shuffleMemorySize,
- int pageSize) {
+ int pageSize,
+ int numberOfNettyArenas) {
final int numberOfNetworkBuffers;
if (shuffleMemorySize != null) { // flip49 enbaled
- numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(shuffleMemorySize.getBytes(), pageSize);
+ numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(shuffleMemorySize.getBytes(), pageSize, numberOfNettyArenas);
logIfIgnoringOldConfigs(configuration);
} else if (!hasNewNetworkConfig(configuration)) {
// fallback: number of network buffers
@@ -478,7 +486,7 @@ public class NettyShuffleEnvironmentConfiguration {
logIfIgnoringOldConfigs(configuration);
final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
- numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(networkMemorySize, pageSize);
+ numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(networkMemorySize, pageSize, 0);
}
return numberOfNetworkBuffers;
@@ -491,9 +499,26 @@ public class NettyShuffleEnvironmentConfiguration {
}
}
- private static int calculateNumberOfNetworkBuffers(long networkMemorySizeByte, int pageSizeByte) {
+ private static int calculateNumberOfNetworkBuffers(
+ long networkMemorySizeByte,
+ int pageSizeByte,
+ int numberOfNettyArenas) {
+
+ long nettyArenasSizeBytes = numberOfNettyArenas * NettyBufferPool.ARENA_SIZE;
+ Preconditions.checkArgument(
+ networkMemorySizeByte >= nettyArenasSizeBytes,
+ String.format(
+ "Provided shuffle memory %d bytes is not enough for direct allocation of %d netty arenas " +
+ "('taskmanager.network.netty.num-arenas', by default 'taskmanager.numberOfTaskSlots'). " +
+ "Each arena size is %d bytes. Total configured arenas size is %d bytes. " +
+ "Try to increase shuffle memory size by adjusting 'taskmanager.memory.shuffle.*' Flink configuration options.",
+ networkMemorySizeByte,
+ numberOfNettyArenas,
+ NettyBufferPool.ARENA_SIZE,
+ nettyArenasSizeBytes));
+
// tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory)
- long numberOfNetworkBuffersLong = networkMemorySizeByte / pageSizeByte;
+ long numberOfNetworkBuffersLong = (networkMemorySizeByte - nettyArenasSizeBytes) / pageSizeByte;
if (numberOfNetworkBuffersLong > Integer.MAX_VALUE) {
throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySizeByte
+ ") corresponds to more than MAX_INT pages.");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
index 79ea059..26528a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceUtilsTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -201,13 +202,16 @@ public class TaskExecutorResourceUtilsTest extends TestLogger {
public void testConfigShuffleMemoryLegacyNumOfBuffers() {
final MemorySize pageSize = MemorySize.parse("32k");
final int numOfBuffers = 1024;
- final MemorySize shuffleSize = pageSize.multiply(numOfBuffers);
+ final MemorySize arenaSize = MemorySize.parse(NettyBufferPool.ARENA_SIZE + "b");
+ final int numberOfNettyArenas = 10;
+ final MemorySize shuffleSize = pageSize.multiply(numOfBuffers).add(arenaSize.multiply(numberOfNettyArenas));
@SuppressWarnings("deprecation")
final ConfigOption<Integer> legacyOption = NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS;
Configuration conf = new Configuration();
conf.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, pageSize.getKibiBytes() + "k");
+ conf.setInteger(NettyShuffleEnvironmentOptions.NUM_ARENAS, numberOfNettyArenas);
conf.setInteger(legacyOption, numOfBuffers);
// validate in configurations without explicit total flink/process memory, otherwise explicit configured
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
index c4c4c0b..fc0035c 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NettyShuffleEnvironmentConfigurationTest.java
@@ -30,8 +30,10 @@ import org.junit.Test;
import java.net.InetAddress;
import java.util.Random;
+import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -42,6 +44,20 @@ public class NettyShuffleEnvironmentConfigurationTest extends TestLogger {
private static final long MEM_SIZE_PARAM = 128L * 1024 * 1024;
+ @Test
+ public void testNetworkBufferNumberCalculation() {
+ final Configuration config = new Configuration();
+ config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "1m");
+ config.setInteger(NettyShuffleEnvironmentOptions.NUM_ARENAS, 4); // 4 x 16Mb = 64Mb
+ final int numNetworkBuffers = NettyShuffleEnvironmentConfiguration.fromConfiguration(
+ config,
+ MEM_SIZE_PARAM,
+ null,
+ true,
+ InetAddress.getLoopbackAddress()).numNetworkBuffers();
+ assertThat(numNetworkBuffers, is(64)); // 128Mb (total) - 64Mb (arenas) / 1Mb (page) = 64
+ }
+
/**
* Verifies that {@link NettyShuffleEnvironmentConfiguration#fromConfiguration(Configuration, long, MemorySize, boolean, InetAddress)}
* returns the correct result for new configurations via
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
index 22d291d..36050e8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -68,7 +68,11 @@ public class JobSubmissionFailsITCase extends TestLogger {
private static Configuration getConfiguration() {
Configuration config = new Configuration();
- config.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, "4m");
+ config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+
+ // to accommodate for 10 netty arenas (NUM_SLOTS / NUM_TM) x 16Mb (NettyBufferPool.ARENA_SIZE)
+ config.setString(TaskManagerOptions.SHUFFLE_MEMORY_MIN, "256m");
+
return config;
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
index e7c2072..970a74a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BackPressureITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
@@ -94,7 +95,8 @@ public class BackPressureITCase extends TestLogger {
final Configuration configuration = new Configuration();
final int memorySegmentSizeKb = 32;
- final String networkBuffersMemory = (memorySegmentSizeKb * NUM_TASKS) + "kb";
+ final long nettyArenaSizeKb = NettyBufferPool.ARENA_SIZE >> 10;
+ final String networkBuffersMemory = ((memorySegmentSizeKb + nettyArenaSizeKb) * NUM_TASKS) + "kb";
configuration.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, memorySegmentSizeKb + "kb");
configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, networkBuffersMemory);
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 36f90e4..35ab654 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -91,8 +91,8 @@ public class YarnConfigurationITCase extends YarnTestBase {
// disable heap cutoff min
configuration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 0);
- configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(1L << 20));
- configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(4L << 20));
+ configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(49L << 20));
+ configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(52L << 20));
final YarnConfiguration yarnConfiguration = getYarnConfiguration();
final YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(