You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/10/01 01:07:14 UTC

[2/2] flink git commit: [FLINK-2773] remove strict upper direct memory limit

[FLINK-2773] remove strict upper direct memory limit

Setting a strict upper limit for the direct memory size can cause
problems with the direct memory allocation of the Netty network stack
leading to OutOfMemoryExceptions.

This closes #1203.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/011cbbf7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/011cbbf7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/011cbbf7

Branch: refs/heads/master
Commit: 011cbbf7e4643234b1183b7ccc9c60d9f23a43ce
Parents: 1243d7b
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 30 16:45:14 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Oct 1 01:05:42 2015 +0200

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/bin/config.sh     | 19 ++----------
 .../src/main/flink-bin/bin/taskmanager.sh       | 13 +++-----
 .../flink/runtime/taskmanager/TaskManager.scala | 26 ++--------------
 .../flink/yarn/YARNSessionFIFOITCase.java       |  5 ++-
 .../flink/yarn/ApplicationMasterActor.scala     | 32 ++++++--------------
 5 files changed, 20 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/011cbbf7/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index f4f58f2..653944c 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -91,11 +91,6 @@ KEY_JOBM_MEM_SIZE="jobmanager.heap.mb"
 KEY_TASKM_MEM_SIZE="taskmanager.heap.mb"
 KEY_TASKM_MEM_MANAGED_SIZE="taskmanager.memory.size"
 KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction"
-KEY_TASKM_MEM_NETWORK_BUFFERS="taskmanager.network.numberOfBuffers"
-# BEGIN:deprecated
-KEY_TASKM_MEM_NETWORK_BUFFER_SIZE="taskmanager.network.bufferSizeInBytes"
-# END:deprecated
-KEY_TASKM_MEM_SEGMENT_SIZE="taskmanager.memory.segment-size"
 KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap"
 
 KEY_ENV_PID_DIR="env.pid.dir"
@@ -198,22 +193,12 @@ fi
 
 # Define FLINK_TM_MEM_MANAGED_FRACTION if it is not already set
 if [ -z "${FLINK_TM_MEM_MANAGED_FRACTION}" ]; then
-    FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0 "${YAML_CONF}")
-fi
-
-# Define FLINK_TM_MEM_NETWORK_SIZE if it is not already set
-if [ -z "${FLINK_TM_MEM_NETWORK_SIZE}" ]; then
-    BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_SEGMENT_SIZE} "0" "${YAML_CONF}")
-    if [ "${BUFFER_SIZE}" -eq "0" ]; then
-        BUFFER_SIZE=$(readFromConfig ${KEY_TASKM_MEM_NETWORK_BUFFER_SIZE} "$((32 * 1024))" "${YAML_CONF}")
-    fi
-    NUM_BUFFERS=$(readFromConfig ${KEY_TASKM_MEM_NETWORK_BUFFERS} "2048" "${YAML_CONF}")
-    FLINK_TM_MEM_NETWORK_SIZE=$((((NUM_BUFFERS * BUFFER_SIZE) >> 20) + 1))
+    FLINK_TM_MEM_MANAGED_FRACTION=$(readFromConfig ${KEY_TASKM_MEM_MANAGED_FRACTION} 0.7 "${YAML_CONF}")
 fi
 
 # Define FLINK_TM_OFFHEAP if it is not already set
 if [ -z "${FLINK_TM_OFFHEAP}" ]; then
-    FLINK_TM_OFFHEAP=$(readFromConfig ${KEY_TASKM_OFFHEAP} 0 "${YAML_CONF}")
+    FLINK_TM_OFFHEAP=$(readFromConfig ${KEY_TASKM_OFFHEAP} "false" "${YAML_CONF}")
 fi
 
 if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then

http://git-wip-us.apache.org/repos/asf/flink/blob/011cbbf7/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index 9b0eb86..f63285e 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -59,9 +59,8 @@ if [[ $STARTSTOP == "start" ]]; then
     if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
 
         TM_HEAP_SIZE=${FLINK_TM_HEAP}
-        TM_OFFHEAP_SIZE=0
-        # some space for Netty initialization
-        NETTY_BUFFERS=1
+        # This is an upper bound, much less direct memory will be used
+        TM_MAX_OFFHEAP_SIZE=${FLINK_TM_HEAP}
 
         if [[ "${STREAMINGMODE}" == "batch" ]] && useOffHeapMemory; then
             if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
@@ -70,7 +69,6 @@ if [[ $STARTSTOP == "start" ]]; then
                     echo "[ERROR] Configured TaskManager memory size ('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size ('${KEY_TASKM_MEM_MANAGED_SIZE}')."
                     exit 1
                 fi
-                TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
                 TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
             else
                 # We calculate the memory using a fraction of the total memory
@@ -79,13 +77,12 @@ if [[ $STARTSTOP == "start" ]]; then
                     exit 1
                 fi
                 # recalculate the JVM heap memory by taking the off-heap ratio into account
-                TM_OFFHEAP_SIZE=`printf '%.0f\n' $(bc -l <<< "${FLINK_TM_HEAP} * ${FLINK_TM_MEM_MANAGED_FRACTION}")`
-                TM_HEAP_SIZE=$((FLINK_TM_HEAP - TM_OFFHEAP_SIZE))
+                OFFHEAP_MANAGED_MEMORY_SIZE=`printf '%.0f\n' $(bc -l <<< "${FLINK_TM_HEAP} * ${FLINK_TM_MEM_MANAGED_FRACTION}")`
+                TM_HEAP_SIZE=$((FLINK_TM_HEAP - OFFHEAP_MANAGED_MEMORY_SIZE))
             fi
         fi
 
-        TM_HEAP_SIZE=$((TM_HEAP_SIZE - FLINK_TM_MEM_NETWORK_SIZE - NETTY_BUFFERS))
-        export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=$((TM_OFFHEAP_SIZE + FLINK_TM_MEM_NETWORK_SIZE + NETTY_BUFFERS))M"
+        export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}M"
 
     fi
 

http://git-wip-us.apache.org/repos/asf/flink/blob/011cbbf7/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 9c0bd7d..aba9c60 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1590,36 +1590,14 @@ object TaskManager {
           fraction).toLong
 
         LOG.info(s"Using $fraction of the currently free heap space for Flink managed " +
-          s" heap memory (${relativeMemSize >> 20} MB).")
+          s"heap memory (${relativeMemSize >> 20} MB).")
 
         relativeMemSize
       }
       else if (memType == MemoryType.OFF_HEAP) {
 
-        val networkBufferSizeNew = configuration.getLong(
-          ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-          ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)
-
-        val networkBufferSizeOld = configuration.getLong(
-          ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, -1)
-        val networkBufferSize =
-          if (networkBufferSizeNew != -1) {
-            networkBufferSizeNew
-          } else if (networkBufferSizeOld == -1) {
-            ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE
-          } else {
-            networkBufferSizeOld
-          }
-
-        val numNetworkBuffers = configuration.getLong(
-          ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
-          ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)
-
-        // direct memory for Netty's off-heap buffers
-        val networkMemory = (numNetworkBuffers * networkBufferSize) + (1 << 20)
-
         // The maximum heap memory has been adjusted according to the fraction
-        val maxMemory = EnvironmentInformation.getMaxJvmHeapMemory() + networkMemory
+        val maxMemory = EnvironmentInformation.getMaxJvmHeapMemory()
         val directMemorySize = (maxMemory / (1.0 - fraction) * fraction).toLong
 
         LOG.info(s"Using $fraction of the maximum memory size for " +

http://git-wip-us.apache.org/repos/asf/flink/blob/011cbbf7/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 0d0a7f2..cd2bdc6 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -555,9 +555,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 			});
 			Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
 			content = FileUtils.readFileToString(jobmanagerLog);
-			// TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE) and then divide
-			// between heap and off-heap memory (see {@link ApplicationMasterActor}).
-			String expected = "Starting TM with command=$JAVA_HOME/bin/java -Xms359m -Xmx359m -XX:MaxDirectMemorySize=65m";
+			// TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE)
+			String expected = "Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m";
 			Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'",
 					content.contains(expected));
 			expected = " (2/2) (attempt #0) to ";

http://git-wip-us.apache.org/repos/asf/flink/blob/011cbbf7/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
index 4af4bcc..59dc53d 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
@@ -562,11 +562,11 @@ trait ApplicationMasterActor extends FlinkActor {
     log.info("Create container launch context.")
     val ctx = Records.newRecord(classOf[ContainerLaunchContext])
 
-    val (heapLimit, offHeapLimit) = calculateMemoryLimits(memoryLimit, streamingMode)
+    val heapLimit = calculateMemoryLimits(memoryLimit, streamingMode)
 
     val javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
     val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xms${heapLimit}m " +
-      s"-Xmx${heapLimit}m -XX:MaxDirectMemorySize=${offHeapLimit}m $javaOpts")
+      s"-Xmx${heapLimit}m -XX:MaxDirectMemorySize=${memoryLimit}m $javaOpts")
 
     if (hasLogback || hasLog4j) {
       tmCommand ++=
@@ -621,28 +621,12 @@ trait ApplicationMasterActor extends FlinkActor {
   }
 
   /**
-   * Calculate the correct JVM heap and off-heap memory limits.
+   * Calculate the correct JVM heap memory limit.
    * @param memoryLimit The maximum memory in megabytes.
    * @param streamingMode True if this is a streaming cluster.
    * @return A Tuple2 containing the heap and the offHeap limit in megabytes.
    */
-  private def calculateMemoryLimits(memoryLimit: Long, streamingMode: Boolean): (Long, Long) = {
-
-    // The new config entry overrides the old one
-    val networkBufferSizeOld = flinkConfiguration.getLong(
-      ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)
-
-    val networkBufferSize = flinkConfiguration.getLong(
-      ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
-      networkBufferSizeOld)
-
-    val numNetworkBuffers = flinkConfiguration.getLong(
-      ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)
-
-    // direct memory for Netty's off-heap buffers
-    val networkMemory = ((numNetworkBuffers * networkBufferSize) >> 20) + 1
+  private def calculateMemoryLimits(memoryLimit: Long, streamingMode: Boolean): Long = {
 
     val useOffHeap = flinkConfiguration.getBoolean(
       ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)
@@ -650,17 +634,19 @@ trait ApplicationMasterActor extends FlinkActor {
     if (useOffHeap && !streamingMode){
       val fixedOffHeapSize = flinkConfiguration.getLong(
         ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
+
       if (fixedOffHeapSize > 0) {
-        (memoryLimit - fixedOffHeapSize - networkMemory, fixedOffHeapSize + networkMemory)
+        memoryLimit - fixedOffHeapSize
       } else {
         val fraction = flinkConfiguration.getFloat(
           ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
           ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
         val offHeapSize = (fraction * memoryLimit).toLong
-        (memoryLimit - offHeapSize - networkMemory, offHeapSize + networkMemory)
+        memoryLimit - offHeapSize
       }
+
     } else {
-      (memoryLimit - networkMemory, networkMemory)
+      memoryLimit
     }
   }
 }