You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/17 13:24:05 UTC

[2/9] flink git commit: [FLINK-2640] [yarn] integrate off-heap configuration

[FLINK-2640] [yarn] integrate off-heap configuration

This closes #1132


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93c95b6a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93c95b6a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93c95b6a

Branch: refs/heads/master
Commit: 93c95b6a6f150a2c55dc387e4ef1d603b3ef3f22
Parents: 8ca853e
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Sep 15 11:04:34 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Sep 17 11:19:33 2015 +0200

----------------------------------------------------------------------
 .../flink/yarn/YARNSessionFIFOITCase.java       | 13 +++--
 .../apache/flink/yarn/ApplicationMaster.scala   |  5 +-
 .../flink/yarn/ApplicationMasterActor.scala     | 55 ++++++++++++++++++--
 3 files changed, 61 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/93c95b6a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 6f07d36..3d381a6 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -546,12 +546,15 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 			});
 			Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
 			content = FileUtils.readFileToString(jobmanagerLog);
-			// expecting 512 mb, because TM was started with 1024, we cut off 50% (NOT THE DEFAULT VALUE).
-			Assert.assertTrue("Expected string 'Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m' not found in JobManager log: '"+jobmanagerLog+"'",
-					content.contains("Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m"));
-			Assert.assertTrue("Expected string ' (2/2) (attempt #0) to ' not found in JobManager log." +
+			// TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE) and then divide
+			// between heap and off-heap memory (see {@link ApplicationMasterActor}).
+			String expected = "Starting TM with command=$JAVA_HOME/bin/java -Xms359m -Xmx359m -XX:MaxDirectMemorySize=65m";
+			Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'",
+					content.contains(expected));
+			expected = " (2/2) (attempt #0) to ";
+			Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log." +
 							"This string checks that the job has been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'",
-					content.contains(" (2/2) (attempt #0) to "));
+					content.contains(expected));
 
 			// make sure the detached app is really finished.
 			LOG.info("Checking again that app has finished");

http://git-wip-us.apache.org/repos/asf/flink/blob/93c95b6a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 1d1db7e..869c643 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -26,21 +26,20 @@ import org.apache.flink.client.CliFrontend
 import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants}
 import org.apache.flink.runtime.StreamingMode
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.instance.AkkaActorGateway
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
-import org.apache.flink.runtime.util.{StandaloneUtils, LeaderRetrievalUtils, EnvironmentInformation}
+import org.apache.flink.runtime.util.{StandaloneUtils, EnvironmentInformation}
 import org.apache.flink.runtime.webmonitor.WebMonitor
 import org.apache.flink.yarn.Messages.StartYarnSession
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.conf.YarnConfiguration
+import scala.collection.JavaConversions._
 
 
 import scala.io.Source
 
 object ApplicationMaster {
-  import scala.collection.JavaConversions._
 
   val LOG = Logger(getClass)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/93c95b6a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
index e99f8d2..4af4bcc 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
@@ -405,7 +405,8 @@ trait ApplicationMasterActor extends FlinkActor {
     Try {
       log.info("Start yarn session.")
       memoryPerTaskManager = env.get(FlinkYarnClient.ENV_TM_MEMORY).toInt
-      val heapLimit = Utils.calculateHeapSize(memoryPerTaskManager, flinkConfiguration)
+
+      val memoryLimit = Utils.calculateHeapSize(memoryPerTaskManager, flinkConfiguration)
 
       val applicationMasterHost = env.get(Environment.NM_HOST.key)
       require(applicationMasterHost != null, s"Application master (${Environment.NM_HOST} not set.")
@@ -500,7 +501,7 @@ trait ApplicationMasterActor extends FlinkActor {
       val hs = ApplicationMaster.hasStreamingMode(env)
       containerLaunchContext = Some(
         createContainerLaunchContext(
-          heapLimit,
+          memoryLimit,
           hasLogback,
           hasLog4j,
           yarnClientUsername,
@@ -550,7 +551,7 @@ trait ApplicationMasterActor extends FlinkActor {
   }
 
   private def createContainerLaunchContext(
-      heapLimit: Int,
+      memoryLimit: Int,
       hasLogback: Boolean,
       hasLog4j: Boolean,
       yarnClientUsername: String,
@@ -561,9 +562,11 @@ trait ApplicationMasterActor extends FlinkActor {
     log.info("Create container launch context.")
     val ctx = Records.newRecord(classOf[ContainerLaunchContext])
 
+    val (heapLimit, offHeapLimit) = calculateMemoryLimits(memoryLimit, streamingMode)
+
     val javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
     val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xms${heapLimit}m " +
-      s"-Xmx${heapLimit}m $javaOpts")
+      s"-Xmx${heapLimit}m -XX:MaxDirectMemorySize=${offHeapLimit}m $javaOpts")
 
     if (hasLogback || hasLog4j) {
       tmCommand ++=
@@ -616,4 +619,48 @@ trait ApplicationMasterActor extends FlinkActor {
 
     ctx
   }
+
+  /**
+   * Calculate the correct JVM heap and off-heap memory limits.
+   * @param memoryLimit The maximum memory in megabytes.
+   * @param streamingMode True if this is a streaming cluster.
+   * @return A Tuple2 containing the heap and the offHeap limit in megabytes.
+   */
+  private def calculateMemoryLimits(memoryLimit: Long, streamingMode: Boolean): (Long, Long) = {
+
+    // The new config entry overrides the old one
+    val networkBufferSizeOld = flinkConfiguration.getLong(
+      ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
+      ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)
+
+    val networkBufferSize = flinkConfiguration.getLong(
+      ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+      networkBufferSizeOld)
+
+    val numNetworkBuffers = flinkConfiguration.getLong(
+      ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+      ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)
+
+    // direct memory for Netty's off-heap buffers
+    val networkMemory = ((numNetworkBuffers * networkBufferSize) >> 20) + 1
+
+    val useOffHeap = flinkConfiguration.getBoolean(
+      ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)
+
+    if (useOffHeap && !streamingMode){
+      val fixedOffHeapSize = flinkConfiguration.getLong(
+        ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
+      if (fixedOffHeapSize > 0) {
+        (memoryLimit - fixedOffHeapSize - networkMemory, fixedOffHeapSize + networkMemory)
+      } else {
+        val fraction = flinkConfiguration.getFloat(
+          ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+          ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
+        val offHeapSize = (fraction * memoryLimit).toLong
+        (memoryLimit - offHeapSize - networkMemory, offHeapSize + networkMemory)
+      }
+    } else {
+      (memoryLimit - networkMemory, networkMemory)
+    }
+  }
 }