You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/17 13:24:05 UTC
[2/9] flink git commit: [FLINK-2640] [yarn] integrate off-heap
configuration
[FLINK-2640] [yarn] integrate off-heap configuration
This closes #1132
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93c95b6a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93c95b6a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93c95b6a
Branch: refs/heads/master
Commit: 93c95b6a6f150a2c55dc387e4ef1d603b3ef3f22
Parents: 8ca853e
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Sep 15 11:04:34 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Sep 17 11:19:33 2015 +0200
----------------------------------------------------------------------
.../flink/yarn/YARNSessionFIFOITCase.java | 13 +++--
.../apache/flink/yarn/ApplicationMaster.scala | 5 +-
.../flink/yarn/ApplicationMasterActor.scala | 55 ++++++++++++++++++--
3 files changed, 61 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/93c95b6a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 6f07d36..3d381a6 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -546,12 +546,15 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
});
Assert.assertNotNull("Unable to locate JobManager log", jobmanagerLog);
content = FileUtils.readFileToString(jobmanagerLog);
- // expecting 512 mb, because TM was started with 1024, we cut off 50% (NOT THE DEFAULT VALUE).
- Assert.assertTrue("Expected string 'Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m' not found in JobManager log: '"+jobmanagerLog+"'",
- content.contains("Starting TM with command=$JAVA_HOME/bin/java -Xms424m -Xmx424m"));
- Assert.assertTrue("Expected string ' (2/2) (attempt #0) to ' not found in JobManager log." +
+ // TM was started with 1024 but we cut off 50% (NOT THE DEFAULT VALUE) and then divide
+ // between heap and off-heap memory (see {@link ApplicationMasterActor}).
+ String expected = "Starting TM with command=$JAVA_HOME/bin/java -Xms359m -Xmx359m -XX:MaxDirectMemorySize=65m";
+ Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '"+jobmanagerLog+"'",
+ content.contains(expected));
+ expected = " (2/2) (attempt #0) to ";
+ Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log." +
"This string checks that the job has been started with a parallelism of 2. Log contents: '"+jobmanagerLog+"'",
- content.contains(" (2/2) (attempt #0) to "));
+ content.contains(expected));
// make sure the detached app is really finished.
LOG.info("Checking again that app has finished");
http://git-wip-us.apache.org/repos/asf/flink/blob/93c95b6a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 1d1db7e..869c643 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -26,21 +26,20 @@ import org.apache.flink.client.CliFrontend
import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants}
import org.apache.flink.runtime.StreamingMode
import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.instance.AkkaActorGateway
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.jobmanager.web.WebInfoServer
-import org.apache.flink.runtime.util.{StandaloneUtils, LeaderRetrievalUtils, EnvironmentInformation}
+import org.apache.flink.runtime.util.{StandaloneUtils, EnvironmentInformation}
import org.apache.flink.runtime.webmonitor.WebMonitor
import org.apache.flink.yarn.Messages.StartYarnSession
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.conf.YarnConfiguration
+import scala.collection.JavaConversions._
import scala.io.Source
object ApplicationMaster {
- import scala.collection.JavaConversions._
val LOG = Logger(getClass)
http://git-wip-us.apache.org/repos/asf/flink/blob/93c95b6a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
index e99f8d2..4af4bcc 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
@@ -405,7 +405,8 @@ trait ApplicationMasterActor extends FlinkActor {
Try {
log.info("Start yarn session.")
memoryPerTaskManager = env.get(FlinkYarnClient.ENV_TM_MEMORY).toInt
- val heapLimit = Utils.calculateHeapSize(memoryPerTaskManager, flinkConfiguration)
+
+ val memoryLimit = Utils.calculateHeapSize(memoryPerTaskManager, flinkConfiguration)
val applicationMasterHost = env.get(Environment.NM_HOST.key)
require(applicationMasterHost != null, s"Application master (${Environment.NM_HOST} not set.")
@@ -500,7 +501,7 @@ trait ApplicationMasterActor extends FlinkActor {
val hs = ApplicationMaster.hasStreamingMode(env)
containerLaunchContext = Some(
createContainerLaunchContext(
- heapLimit,
+ memoryLimit,
hasLogback,
hasLog4j,
yarnClientUsername,
@@ -550,7 +551,7 @@ trait ApplicationMasterActor extends FlinkActor {
}
private def createContainerLaunchContext(
- heapLimit: Int,
+ memoryLimit: Int,
hasLogback: Boolean,
hasLog4j: Boolean,
yarnClientUsername: String,
@@ -561,9 +562,11 @@ trait ApplicationMasterActor extends FlinkActor {
log.info("Create container launch context.")
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+ val (heapLimit, offHeapLimit) = calculateMemoryLimits(memoryLimit, streamingMode)
+
val javaOpts = flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "")
val tmCommand = new StringBuilder(s"$$JAVA_HOME/bin/java -Xms${heapLimit}m " +
- s"-Xmx${heapLimit}m $javaOpts")
+ s"-Xmx${heapLimit}m -XX:MaxDirectMemorySize=${offHeapLimit}m $javaOpts")
if (hasLogback || hasLog4j) {
tmCommand ++=
@@ -616,4 +619,48 @@ trait ApplicationMasterActor extends FlinkActor {
ctx
}
+
+ /**
+ * Calculate the correct JVM heap and off-heap memory limits.
+ * @param memoryLimit The maximum memory in megabytes.
+ * @param streamingMode True if this is a streaming cluster.
+ * @return A Tuple2 containing the heap and the offHeap limit in megabytes.
+ */
+ private def calculateMemoryLimits(memoryLimit: Long, streamingMode: Boolean): (Long, Long) = {
+
+ // The new config entry overrides the old one
+ val networkBufferSizeOld = flinkConfiguration.getLong(
+ ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE)
+
+ val networkBufferSize = flinkConfiguration.getLong(
+ ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY,
+ networkBufferSizeOld)
+
+ val numNetworkBuffers = flinkConfiguration.getLong(
+ ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
+ ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS)
+
+ // direct memory for Netty's off-heap buffers
+ val networkMemory = ((numNetworkBuffers * networkBufferSize) >> 20) + 1
+
+ val useOffHeap = flinkConfiguration.getBoolean(
+ ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)
+
+ if (useOffHeap && !streamingMode){
+ val fixedOffHeapSize = flinkConfiguration.getLong(
+ ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L)
+ if (fixedOffHeapSize > 0) {
+ (memoryLimit - fixedOffHeapSize - networkMemory, fixedOffHeapSize + networkMemory)
+ } else {
+ val fraction = flinkConfiguration.getFloat(
+ ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
+ ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION)
+ val offHeapSize = (fraction * memoryLimit).toLong
+ (memoryLimit - offHeapSize - networkMemory, offHeapSize + networkMemory)
+ }
+ } else {
+ (memoryLimit - networkMemory, networkMemory)
+ }
+ }
}