You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/10/01 01:07:14 UTC
[2/2] flink git commit: [FLINK-2773] remove strict upper direct
memory limit
[FLINK-2773] remove strict upper direct memory limit
Setting a strict upper limit for the direct memory size can cause
problems with the direct memory allocation of the Netty network stack
leading to OutOfMemoryExceptions.
This closes #1203.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/011cbbf7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/011cbbf7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/011cbbf7
Branch: refs/heads/master
Commit: 011cbbf7e4643234b1183b7ccc9c60d9f23a43ce
Parents: 1243d7b
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 30 16:45:14 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Oct 1 01:05:42 2015 +0200
----------------------------------------------------------------------
flink-dist/src/main/flink-bin/bin/config.sh | 19 ++----------
.../src/main/flink-bin/bin/taskmanager.sh | 13 +++-----
.../flink/runtime/taskmanager/TaskManager.scala | 26 ++--------------
.../flink/yarn/YARNSessionFIFOITCase.java | 5 ++-
.../flink/yarn/ApplicationMasterActor.scala | 32 ++++++--------------
5 files changed, 20 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/011cbbf7/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index f4f58f2..653944c 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -91,11 +91,6 @@ KEY_JOBM_MEM_SIZE="jobmanager.heap.mb"
KEY_TASKM_MEM_SIZE="taskmanager.heap.mb"
KEY_TASKM_MEM_MANAGED_SIZE="taskmanager.memory.size"
KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction"
-KEY_TASKM_MEM_NETWORK_BUFFERS="taskmanager.network.numberOfBuffers"
-# BEGIN:deprecated
-KEY_TASKM_MEM_NETWORK_BUFFER_SIZE="taskmanager.network.bufferSizeInBytes"
-# END:deprecated
-KEY_TASKM_MEM_SEGMENT_SIZE="taskmanager.memory.segment-size"
KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap"
KEY_ENV_PID_DIR="env.pid.dir"
@@ -198,22 +193,12 @@ fi
# Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then
- FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0 "${YAML_CONF}")
-fi
-
-# Define FLINK_TM_MEM_NETWORK_SIZE if it is not already set
-if [ -z "${FLINK_TM_MEM_NETWORK_SIZE}" ]; then
- BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_SEGMENT_SIZE} "0" "${YAML_CONF}")
- if [ "${BUFFER_SIZE}" -eq "0" ]; then
- BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_NETWORK_BUFFER_SIZE} "$((32 * 1024))" "${YAML_CONF}")
- fi
- NUM_BUFFERS=$(readFromConfig ${KEY_TASKM_MEM_NETWORK_BUFFERS} "2048" "${YAML_CONF}")
- FLINK_TM_MEM_NETWORK_SIZE=$((((NUM_BUFFERS * BUFFER_SIZE) >> 20) + 1))
+ FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0.7 "${YAML_CONF}")
fi
# Define FLINK_TM_OFFHEAP if it is not already set
if [ -z "${FLINK_TM_OFFHEAP}" ]; then
- FLINK_TM_OFFHEAP=$(readFromConfig ${KEY_TASKM_OFFHEAP} 0 "${YAML_CONF}")
+ FLINK_TM_OFFHEAP=$(readFromConfig ${KEY_TASKM_OFFHEAP} "false" "${YAML_CONF}")
fi
if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then
http://git-wip-us.apache.org/repos/asf/flink/blob/011cbbf7/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index 9b0eb86..f63285e 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -59,9 +59,8 @@ if [[ $STARTSTOP == "start" ]]; then
if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
TM_HEAP_SIZE=${FLINK_TM_HEAP}
- TM_OFFHEAP_SIZE=0
- # some space for Netty initialization
- NETTY_BUFFERS=1
+ # This is an upper bound, much less direct memory will be used
+ TM_MAX_OFFHEAP_SIZE=${FLINK_TM_HEAP}
if [[ "${STREAMINGMODE}" == "batch" ]] && useOffHeapMemory; then
if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
@@ -70,7 +69,6 @@ if [[ $STARTSTOP == "start" ]]; then
echo "[ERROR] Configured TaskManager memory size ('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size ('${KEY_TASKM_MEM_MANAGED_SIZE}')."
exit 1
fi
- TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
else
# We calculate the memory using a fraction of the total memory
@@ -79,13 +77,12 @@ if [[ $STARTSTOP == "start" ]]; then
exit 1
fi
# recalculate the JVM heap memory by taking the off-heap ratio into account
- TM_OFFHEAP_SIZE=`printf '%.0f\n' $(bc -l <<< "${FLINK_TM_HEAP} * ${FLINK_TM_MEM_MANAGED_FRACTION}")`
- TM_HEAP_SIZE=$((FLINK_TM_HEAP - TM_OFFHEAP_SIZE))
+ OFFHEAP_MANAGED_MEMORY_SIZE=`printf '%.0f\n' $(bc -l <<< "${FLINK_TM_HEAP} * ${FLINK_TM_MEM_MANAGED_FRACTION}")`
+ TM_HEAP_SIZE=$((FLINK_TM_HEAP - OFFHEAP_MANAGED_MEMORY_SIZE))
fi
fi
- TM_HEAP_SIZE=$((TM_HEAP_SIZE - FLINK_TM_MEM_NETWORK_SIZE - NETTY_BUFFERS))
- export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=$((TM_OFFHEAP_SIZE + FLINK_TM_MEM_NETWORK_SIZE + NETTY_BUFFERS))M"
+ export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}M"
fi
http://git-wip-us.apache.org/repos/asf/flink/blob/011cbbf7/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 9c0bd7d..aba9c60 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1590,36 +1590,14 @@ object TaskManager {
fraction).toLong
LOG.info(s"Using $fraction of the currently free heap space for Flink managed " +
- s" heap memory (${relativeMemSize >> 20} MB).")
+ s"heap memory (${relativeMemSize >> 20} MB).")
relativeMemSize
}
else if (memType == MemoryType.OFF_HEAP) {
- val networkBufferSizeNew = configuration.getLong(
- ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)
-
- val networkBufferSizeOld = configuration.getLong(
- ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
- val networkBufferSize =
- if (networkBufferSizeNew != -1) {
- networkBufferSizeNew
- } else if (networkBufferSizeOld == -1) {
- ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE
- } else {
- networkBufferSizeOld
- }
-
- val numNetworkBuffers = configuration.getLong(
- ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)
-
- // direct memory for Netty's off-heap buffers
- val networkMemory = (numNetworkBuffers * networkBufferSize) + (1 << 20)
-
// The maximum heap memory has been adjusted according to the fraction
- val maxMemory = EnvironmentInformation.getMaxJvmHeapMemory() + networkMemory
+ val maxMemory = EnvironmentInformation.getMaxJvmHeapMemory()
val directMemorySize = (maxMemory / (1.0 - fraction) * fraction).toLong
LOG.info(s"Using $fraction of the maximum memory size for " +
http://git-wip-us.apache.org/repos/asf/flink/blob/011cbbf7/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 0d0a7f2..cd2bdc6 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -555,9 +555,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
});
Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
content = FileUtils.readFileToString(jobmanagerLog);
- // TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE) and then divide
- // between heap and off-heap memory (see {@link ApplicationMasterActor}).
- String expected = "Starting TM with command=$JAVA_HOME/bin/java -Xms359m -Xmx359m -XX:MaxDirectMemorySize=65m";
+ // TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE)
+ String expected = "Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m";
Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'",
content.contains(expected));
expected = " (2/2) (attempt #0) to ";
http://git-wip-us.apache.org/repos/asf/flink/blob/011cbbf7/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
index 4af4bcc..59dc53d 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
@@ -562,11 +562,11 @@ trait ApplicationMasterActor extends FlinkActor {
log.info("Create container launch context.")
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
- val (heapLimit, offHeapLimit) = calculateMemoryLimits(memoryLimit, streamingMode)
+ val heapLimit = calculateMemoryLimits(memoryLimit, streamingMode)
val javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xms${heapLimit}m " +
- s"-Xmx${heapLimit}m -XX:MaxDirectMemorySize=${offHeapLimit}m $javaOpts")
+ s"-Xmx${heapLimit}m -XX:MaxDirectMemorySize=${memoryLimit}m $javaOpts")
if (hasLogback || hasLog4j) {
tmCommand ++=
@@ -621,28 +621,12 @@ trait ApplicationMasterActor extends FlinkActor {
}
/**
- * Calculate the correct JVM heap and off-heap memory limits.
+ * Calculate the correct JVM heap memory limit.
* @param memoryLimit The maximum memory in megabytes.
* @param streamingMode True if this is a streaming cluster.
* @return A Tuple2 containing the heap and the offHeap limit in megabytes.
*/
- private def calculateMemoryLimits(memoryLimit: Long, streamingMode: Boolean): (Long, Long) = {
-
- // The new config entry overrides the old one
- val networkBufferSizeOld = flinkConfiguration.getLong(
- ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)
-
- val networkBufferSize = flinkConfiguration.getLong(
- ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
- networkBufferSizeOld)
-
- val numNetworkBuffers = flinkConfiguration.getLong(
- ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
- ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)
-
- // direct memory for Netty's off-heap buffers
- val networkMemory = ((numNetworkBuffers * networkBufferSize) >> 20) + 1
+ private def calculateMemoryLimits(memoryLimit: Long, streamingMode: Boolean): Long = {
val useOffHeap = flinkConfiguration.getBoolean(
ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)
@@ -650,17 +634,19 @@ trait ApplicationMasterActor extends FlinkActor {
if (useOffHeap && !streamingMode){
val fixedOffHeapSize = flinkConfiguration.getLong(
ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
+
if (fixedOffHeapSize > 0) {
- (memoryLimit - fixedOffHeapSize - networkMemory, fixedOffHeapSize + networkMemory)
+ memoryLimit - fixedOffHeapSize
} else {
val fraction = flinkConfiguration.getFloat(
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
val offHeapSize = (fraction * memoryLimit).toLong
- (memoryLimit - offHeapSize - networkMemory, offHeapSize + networkMemory)
+ memoryLimit - offHeapSize
}
+
} else {
- (memoryLimit - networkMemory, networkMemory)
+ memoryLimit
}
}
}