You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2019/11/07 21:55:56 UTC

[flink] 05/21: [FLINK-13983][runtime][yarn/mesos] Launches TaskExecutors on Yarn/Mesos with JVM parameters and dynamic configs generated from TaskExecutorResourceSpec.

This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b3ebcf00ac8d925fca99967a772154956abfae5
Author: Xintong Song <to...@gmail.com>
AuthorDate: Fri Sep 27 17:12:10 2019 +0800

    [FLINK-13983][runtime][yarn/mesos] Launches TaskExecutors on Yarn/Mesos with JVM parameters and dynamic configs generated from TaskExecutorResourceSpec.
---
 .../clusterframework/LaunchableMesosWorker.java    | 23 +++++++++++---
 .../MesosTaskManagerParameters.java                |  9 ++++++
 .../clusterframework/MesosResourceManagerTest.java |  2 +-
 .../runtime/clusterframework/BootstrapTools.java   | 12 ++++++--
 .../ContaineredTaskManagerParameters.java          | 19 ++++++++++--
 .../clusterframework/TaskExecutorResourceSpec.java | 16 +++++++++-
 .../runtime/resourcemanager/ResourceManager.java   | 12 ++++++++
 .../clusterframework/BootstrapToolsTest.java       | 35 ++++++++++++++++++++-
 .../ContaineredTaskManagerParametersTest.java      |  6 ++--
 .../test/java/org/apache/flink/yarn/UtilsTest.java |  2 +-
 .../apache/flink/yarn/YarnConfigurationITCase.java |  1 +
 .../org/apache/flink/yarn/YarnResourceManager.java | 36 ++++++++++++++++++----
 12 files changed, 151 insertions(+), 22 deletions(-)

diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 895c66a..8a9f93a 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -28,6 +28,7 @@ import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.mesos.util.MesosResourceAllocation;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.util.Preconditions;
 
 import com.netflix.fenzo.ConstraintEvaluator;
@@ -135,7 +136,11 @@ public class LaunchableMesosWorker implements LaunchableTask {
 
 		@Override
 		public double getMemory() {
-			return params.containeredParameters().taskManagerTotalMemoryMB();
+			if (params.containeredParameters().getTaskExecutorResourceSpec() == null) { // flip49 disabled
+				return params.containeredParameters().taskManagerTotalMemoryMB();
+			} else {
+				return params.containeredParameters().getTaskExecutorResourceSpec().getTotalProcessMemorySize().getMebiBytes();
+			}
 		}
 
 		@Override
@@ -275,10 +280,15 @@ public class LaunchableMesosWorker implements LaunchableTask {
 		env.addVariables(variable(MesosConfigKeys.ENV_FLINK_CONTAINER_ID, taskInfo.getTaskId().getValue()));
 
 		// finalize the memory parameters
-		jvmArgs.append(" -Xms").append(tmParams.taskManagerHeapSizeMB()).append("m");
-		jvmArgs.append(" -Xmx").append(tmParams.taskManagerHeapSizeMB()).append("m");
-		if (tmParams.taskManagerDirectMemoryLimitMB() >= 0) {
-			jvmArgs.append(" -XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append("m");
+		if (tmParams.getTaskExecutorResourceSpec() == null) { // flip49 disabled
+			jvmArgs.append(" -Xms").append(tmParams.taskManagerHeapSizeMB()).append("m");
+			jvmArgs.append(" -Xmx").append(tmParams.taskManagerHeapSizeMB()).append("m");
+			if (tmParams.taskManagerDirectMemoryLimitMB() >= 0) {
+				jvmArgs.append(" -XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append(
+					"m");
+			}
+		} else { // flip49 enabled
+			jvmArgs.append(" ").append(TaskExecutorResourceUtils.generateJvmParametersStr(tmParams.getTaskExecutorResourceSpec()));
 		}
 
 		// pass dynamic system properties
@@ -301,6 +311,9 @@ public class LaunchableMesosWorker implements LaunchableTask {
 			.append(params.command())
 			.append(" ")
 			.append(ContainerSpecification.formatSystemProperties(dynamicProperties));
+		if (tmParams.getTaskExecutorResourceSpec() != null) { // flip49 enabled
+			launchCommand.append(" ").append(TaskExecutorResourceUtils.generateDynamicConfigsStr(tmParams.getTaskExecutorResourceSpec()));
+		}
 		cmd.setValue(launchCommand.toString());
 
 		// build the container info
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 1d49000..fcad191 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -24,6 +24,8 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.util.Preconditions;
 
 import com.netflix.fenzo.ConstraintEvaluator;
@@ -333,9 +335,16 @@ public class MesosTaskManagerParameters {
 	public static MesosTaskManagerParameters create(Configuration flinkConfig) {
 
 		List<ConstraintEvaluator> constraints = parseConstraints(flinkConfig.getString(MESOS_CONSTRAINTS_HARD_HOSTATTR));
+		TaskExecutorResourceSpec taskExecutorResourceSpec;
+		if (flinkConfig.getBoolean(TaskManagerOptions.ENABLE_FLIP_49_CONFIG)) {
+			taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(flinkConfig);
+		} else {
+			taskExecutorResourceSpec = null;
+		}
 		// parse the common parameters
 		ContaineredTaskManagerParameters containeredParameters = ContaineredTaskManagerParameters.create(
 			flinkConfig,
+			taskExecutorResourceSpec,
 			flinkConfig.getInteger(MESOS_RM_TASKS_MEMORY_MB),
 			flinkConfig.getInteger(MESOS_RM_TASKS_SLOTS));
 
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index b2eb177..177bf2f 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -271,7 +271,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			// TaskExecutor templating
 			ContainerSpecification containerSpecification = new ContainerSpecification();
 			ContaineredTaskManagerParameters containeredParams =
-				new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
+				new ContaineredTaskManagerParameters(null, 1024, 768, 256, 4, new HashMap<String, String>());
 			MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
 				1.0, 1, 0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams,
 				Collections.<Protos.Volume>emptyList(), Collections.<Protos.Parameter>emptyList(), false,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 0b156ca..e18edf3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -395,7 +395,12 @@ public class BootstrapTools {
 				tmParams.taskManagerDirectMemoryLimitMB()));
 		}
 
-		startCommandValues.put("jvmmem", StringUtils.join(params, ' '));
+		final TaskExecutorResourceSpec taskExecutorResourceSpec = tmParams.getTaskExecutorResourceSpec();
+		if (taskExecutorResourceSpec == null) { // FLIP-49 disabled
+			startCommandValues.put("jvmmem", StringUtils.join(params, ' '));
+		} else { // FLIP-49 enabled
+			startCommandValues.put("jvmmem", TaskExecutorResourceUtils.generateJvmParametersStr(taskExecutorResourceSpec));
+		}
 
 		String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS);
 		if (flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS).length() > 0) {
@@ -427,7 +432,10 @@ public class BootstrapTools {
 		startCommandValues.put("redirects",
 			"1> " + logDirectory + "/taskmanager.out " +
 			"2> " + logDirectory + "/taskmanager.err");
-		startCommandValues.put("args", "--configDir " + configDirectory);
+
+		String argsStr = taskExecutorResourceSpec == null ? "" :
+			TaskExecutorResourceUtils.generateDynamicConfigsStr(taskExecutorResourceSpec) + " ";
+		startCommandValues.put("args", argsStr + "--configDir " + configDirectory);
 
 		final String commandTemplate = flinkConfig
 			.getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index a4e7d25..af8b7fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ResourceManagerOptions;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -48,13 +49,19 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 	/** Environment variables to add to the Java process. */
 	private final HashMap<String, String> taskManagerEnv;
 
+	@Nullable // should only be null when flip49 is disabled
+	private final TaskExecutorResourceSpec taskExecutorResourceSpec;
+
 	public ContaineredTaskManagerParameters(
+			@Nullable // should only be null when flip49 is disabled
+			TaskExecutorResourceSpec taskExecutorResourceSpec,
 			long totalContainerMemoryMB,
 			long taskManagerHeapSizeMB,
 			long taskManagerDirectMemoryLimitMB,
 			int numSlots,
 			HashMap<String, String> taskManagerEnv) {
 
+		this.taskExecutorResourceSpec = taskExecutorResourceSpec;
 		this.totalContainerMemoryMB = totalContainerMemoryMB;
 		this.taskManagerHeapSizeMB = taskManagerHeapSizeMB;
 		this.taskManagerDirectMemoryLimitMB = taskManagerDirectMemoryLimitMB;
@@ -64,6 +71,11 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 
 	// ------------------------------------------------------------------------
 
+	@Nullable // should only be null when flip49 is disabled
+	public TaskExecutorResourceSpec getTaskExecutorResourceSpec() {
+		return taskExecutorResourceSpec;
+	}
+
 	public long taskManagerTotalMemoryMB() {
 		return totalContainerMemoryMB;
 	}
@@ -90,7 +102,8 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 	@Override
 	public String toString() {
 		return "TaskManagerParameters {" +
-			"totalContainerMemory=" + totalContainerMemoryMB +
+			"taskExecutorResourceSpec=" + (taskExecutorResourceSpec == null ? "null" : taskExecutorResourceSpec) +
+			", totalContainerMemory=" + totalContainerMemoryMB +
 			", taskManagerHeapSize=" + taskManagerHeapSizeMB +
 			", taskManagerDirectMemoryLimit=" + taskManagerDirectMemoryLimitMB +
 			", numSlots=" + numSlots +
@@ -151,6 +164,8 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 	 */
 	public static ContaineredTaskManagerParameters create(
 			Configuration config,
+			@Nullable // should only be null when flip49 is disabled
+			TaskExecutorResourceSpec taskExecutorResourceSpec,
 			long containerMemoryMB,
 			int numSlots) {
 		// (1) try to compute how much memory used by container
@@ -175,6 +190,6 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
 
 		// done
 		return new ContaineredTaskManagerParameters(
-			containerMemoryMB, heapSizeMB, offHeapSizeMB, numSlots, envVars);
+			taskExecutorResourceSpec, containerMemoryMB, heapSizeMB, offHeapSizeMB, numSlots, envVars);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
index 0813aea..d6cbe5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
@@ -74,7 +74,7 @@ import org.apache.flink.configuration.MemorySize;
  *               └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
  * </pre>
  */
-public class TaskExecutorResourceSpec {
+public class TaskExecutorResourceSpec implements java.io.Serializable {
 
 	private final MemorySize frameworkHeapSize;
 
@@ -155,4 +155,18 @@ public class TaskExecutorResourceSpec {
 	public MemorySize getTotalProcessMemorySize() {
 		return getTotalFlinkMemorySize().add(jvmMetaspaceSize).add(jvmOverheadSize);
 	}
+
+	@Override
+	public String toString() {
+		return "TaskExecutorResourceSpec {"
+			+ "frameworkHeapSize=" + frameworkHeapSize.toString()
+			+ ", taskHeapSize=" + taskHeapSize.toString()
+			+ ", taskOffHeapSize=" + taskOffHeapSize.toString()
+			+ ", shuffleMemSize=" + shuffleMemSize.toString()
+			+ ", onHeapManagedMemorySize=" + onHeapManagedMemorySize.toString()
+			+ ", offHeapManagedMemorySize=" + offHeapManagedMemorySize.toString()
+			+ ", jvmMetaspaceSize=" + jvmMetaspaceSize.toString()
+			+ ", jvmOverheadSize=" + jvmOverheadSize.toString()
+			+ "}";
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index c89a88d..a2aa9ac 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -27,6 +27,8 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
@@ -1210,5 +1212,15 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		final ResourceProfile resourceProfile = TaskManagerServices.computeSlotResourceProfile(numSlots, managedMemoryBytes);
 		return Collections.nCopies(numSlots, resourceProfile);
 	}
+
+	@Nullable // should only be null when flip49 is disabled
+	public static TaskExecutorResourceSpec createTaskExecutorResourceSpec(Configuration config) {
+		final boolean enableFlip49 = config.getBoolean(TaskManagerOptions.ENABLE_FLIP_49_CONFIG);
+		if (enableFlip49) {
+			return TaskExecutorResourceUtils.resourceSpecFromConfig(config);
+		} else {
+			return null;
+		}
+	}
 }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index 04ad29c..3cc3168 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.clusterframework;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.util.ExceptionUtils;
@@ -143,8 +144,40 @@ public class BootstrapToolsTest extends TestLogger {
 	@Test
 	public void testGetTaskManagerShellCommand() {
 		final Configuration cfg = new Configuration();
+		final TaskExecutorResourceSpec taskExecutorResourceSpec = new TaskExecutorResourceSpec(
+			new MemorySize(0), // frameworkHeapSize
+			new MemorySize(111), // taskHeapSize
+			new MemorySize(0), // taskOffHeapSize
+			new MemorySize(222), // shuffleMemSize
+			new MemorySize(0), // onHeapManagedMemorySize
+			new MemorySize(0), // offHeapManagedMemorySize
+			new MemorySize(333), // jvmMetaspaceSize
+			new MemorySize(0)); // jvmOverheadSize
+		final ContaineredTaskManagerParameters containeredParams = new ContaineredTaskManagerParameters(
+			taskExecutorResourceSpec, 1024, 768, 256, 4, new HashMap<String, String>());
+
+		// no logging, with/out krb5
+		final String java = "$JAVA_HOME/bin/java";
+		final String jvmmem = "-Xmx111 -Xms111 -XX:MaxDirectMemorySize=222 -XX:MaxMetaspaceSize=333";
+		final String mainClass =
+			"org.apache.flink.runtime.clusterframework.BootstrapToolsTest";
+		final String dynamicConfigs = TaskExecutorResourceUtils.generateDynamicConfigsStr(taskExecutorResourceSpec).trim();
+		final String args = dynamicConfigs + " --configDir ./conf";
+		final String redirects =
+			"1> ./logs/taskmanager.out 2> ./logs/taskmanager.err";
+
+		assertEquals(
+			java + " " + jvmmem + " " + mainClass + " " + args + " " + redirects,
+			BootstrapTools
+				.getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
+					false, false, false, this.getClass()));
+	}
+
+	@Test
+	public void testGetTaskManagerShellCommandLegacy() {
+		final Configuration cfg = new Configuration();
 		final ContaineredTaskManagerParameters containeredParams =
-			new ContaineredTaskManagerParameters(1024, 768, 256, 4,
+			new ContaineredTaskManagerParameters(null, 1024, 768, 256, 4,
 				new HashMap<String, String>());
 
 		// no logging, with/out krb5
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
index a1f4cad..7fe09fd 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
@@ -45,7 +45,7 @@ public class ContaineredTaskManagerParametersTest extends TestLogger {
 		Configuration conf = new Configuration();
 
 		ContaineredTaskManagerParameters params =
-			ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1);
+			ContaineredTaskManagerParameters.create(conf, null, CONTAINER_MEMORY, 1);
 
 		final float memoryCutoffRatio = conf.getFloat(
 			ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
@@ -80,7 +80,7 @@ public class ContaineredTaskManagerParametersTest extends TestLogger {
 		conf.setBoolean(MEMORY_OFF_HEAP, false);
 
 		ContaineredTaskManagerParameters params =
-			ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1);
+			ContaineredTaskManagerParameters.create(conf, null, CONTAINER_MEMORY, 1);
 
 		assertTrue(params.taskManagerDirectMemoryLimitMB() > 0L);
 
@@ -98,7 +98,7 @@ public class ContaineredTaskManagerParametersTest extends TestLogger {
 		conf.setBoolean(MEMORY_OFF_HEAP, true);
 
 		ContaineredTaskManagerParameters params =
-			ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1);
+			ContaineredTaskManagerParameters.create(conf, null, CONTAINER_MEMORY, 1);
 
 		assertTrue(params.taskManagerDirectMemoryLimitMB() > 0L);
 
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 3a3144e..8f74aa1 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -187,7 +187,7 @@ public class UtilsTest extends TestLogger {
 			hdfsDelegationTokenKind, service));
 		amCredentials.writeTokenStorageFile(new org.apache.hadoop.fs.Path(credentialFile.getAbsolutePath()), yarnConf);
 
-		ContaineredTaskManagerParameters tmParams = new ContaineredTaskManagerParameters(64,
+		ContaineredTaskManagerParameters tmParams = new ContaineredTaskManagerParameters(null, 64,
 			64, 16, 1, new HashMap<>(1));
 		Configuration taskManagerConf = new Configuration();
 
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 2aa476a..8350293 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -177,6 +177,7 @@ public class YarnConfigurationITCase extends YarnTestBase {
 
 					final ContaineredTaskManagerParameters containeredTaskManagerParameters = ContaineredTaskManagerParameters.create(
 						configuration,
+						null,
 						taskManagerMemory,
 						slotsPerTaskManager);
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 3a70086..366a029 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -26,6 +26,8 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -129,6 +131,9 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 	private final Resource resource;
 
+	@Nullable // should only be null when flip49 is disabled
+	private final TaskExecutorResourceSpec taskExecutorResourceSpec;
+
 	public YarnResourceManager(
 			RpcService rpcService,
 			String resourceManagerEndpointId,
@@ -176,7 +181,14 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 		this.webInterfaceUrl = webInterfaceUrl;
 		this.numberOfTaskSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
-		this.defaultTaskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
+
+		this.taskExecutorResourceSpec = createTaskExecutorResourceSpec(flinkConfig);
+		if (taskExecutorResourceSpec != null) { // FLIP-49 enabled
+			final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(flinkConfig);
+			this.defaultTaskManagerMemoryMB = taskExecutorResourceSpec.getTotalProcessMemorySize().getMebiBytes();
+		} else { // FLIP-49 disabled
+			this.defaultTaskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
+		}
 		this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
 		this.resource = Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus);
 
@@ -390,6 +402,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 					try {
 						// Context information used to start a TaskExecutor Java process
 						ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
+							taskExecutorResourceSpec,
 							container.getResource(),
 							containerIdStr,
 							container.getNodeId().getHost());
@@ -534,20 +547,31 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 			RM_REQUEST_PRIORITY);
 	}
 
-	private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
-			throws Exception {
+	private ContainerLaunchContext createTaskExecutorLaunchContext(
+		@Nullable // should only be null when flip49 is disabled
+		TaskExecutorResourceSpec tmResourceSpec,
+		Resource resource,
+		String containerId,
+		String host) throws Exception {
+
 		// init the ContainerLaunchContext
 		final String currDir = env.get(ApplicationConstants.Environment.PWD.key());
 
 		final ContaineredTaskManagerParameters taskManagerParameters =
-				ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), numberOfTaskSlots);
+				ContaineredTaskManagerParameters.create(flinkConfig, tmResourceSpec, resource.getMemory(), numberOfTaskSlots);
 
-		log.debug("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, " +
-				"JVM direct memory limit {} MB",
+		if (tmResourceSpec == null) { // FLIP-49 disabled
+			log.debug("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, " +
+					"JVM direct memory limit {} MB",
 				containerId,
 				taskManagerParameters.taskManagerTotalMemoryMB(),
 				taskManagerParameters.taskManagerHeapSizeMB(),
 				taskManagerParameters.taskManagerDirectMemoryLimitMB());
+		} else { // Flip-49 enabled
+			log.debug("TaskExecutor {} will be started with {}.",
+				containerId,
+				tmResourceSpec);
+		}
 
 		Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig);