You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by az...@apache.org on 2019/11/07 21:55:56 UTC
[flink] 05/21: [FLINK-13983][runtime][yarn/mesos] Launches
TaskExecutors on Yarn/Mesos with JVM parameters and dynamic configs
generated from TaskExecutorResourceSpec.
This is an automated email from the ASF dual-hosted git repository.
azagrebin pushed a commit to branch FLINK-13986-flip49-cleanup-e2e
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5b3ebcf00ac8d925fca99967a772154956abfae5
Author: Xintong Song <to...@gmail.com>
AuthorDate: Fri Sep 27 17:12:10 2019 +0800
[FLINK-13983][runtime][yarn/mesos] Launches TaskExecutors on Yarn/Mesos with JVM parameters and dynamic configs generated from TaskExecutorResourceSpec.
---
.../clusterframework/LaunchableMesosWorker.java | 23 +++++++++++---
.../MesosTaskManagerParameters.java | 9 ++++++
.../clusterframework/MesosResourceManagerTest.java | 2 +-
.../runtime/clusterframework/BootstrapTools.java | 12 ++++++--
.../ContaineredTaskManagerParameters.java | 19 ++++++++++--
.../clusterframework/TaskExecutorResourceSpec.java | 16 +++++++++-
.../runtime/resourcemanager/ResourceManager.java | 12 ++++++++
.../clusterframework/BootstrapToolsTest.java | 35 ++++++++++++++++++++-
.../ContaineredTaskManagerParametersTest.java | 6 ++--
.../test/java/org/apache/flink/yarn/UtilsTest.java | 2 +-
.../apache/flink/yarn/YarnConfigurationITCase.java | 1 +
.../org/apache/flink/yarn/YarnResourceManager.java | 36 ++++++++++++++++++----
12 files changed, 151 insertions(+), 22 deletions(-)
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
index 895c66a..8a9f93a 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
@@ -28,6 +28,7 @@ import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.mesos.util.MesosResourceAllocation;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
import org.apache.flink.util.Preconditions;
import com.netflix.fenzo.ConstraintEvaluator;
@@ -135,7 +136,11 @@ public class LaunchableMesosWorker implements LaunchableTask {
@Override
public double getMemory() {
- return params.containeredParameters().taskManagerTotalMemoryMB();
+ if (params.containeredParameters().getTaskExecutorResourceSpec() == null) { // flip49 disabled
+ return params.containeredParameters().taskManagerTotalMemoryMB();
+ } else {
+ return params.containeredParameters().getTaskExecutorResourceSpec().getTotalProcessMemorySize().getMebiBytes();
+ }
}
@Override
@@ -275,10 +280,15 @@ public class LaunchableMesosWorker implements LaunchableTask {
env.addVariables(variable(MesosConfigKeys.ENV_FLINK_CONTAINER_ID, taskInfo.getTaskId().getValue()));
// finalize the memory parameters
- jvmArgs.append(" -Xms").append(tmParams.taskManagerHeapSizeMB()).append("m");
- jvmArgs.append(" -Xmx").append(tmParams.taskManagerHeapSizeMB()).append("m");
- if (tmParams.taskManagerDirectMemoryLimitMB() >= 0) {
- jvmArgs.append(" -XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append("m");
+ if (tmParams.getTaskExecutorResourceSpec() == null) { // flip49 disabled
+ jvmArgs.append(" -Xms").append(tmParams.taskManagerHeapSizeMB()).append("m");
+ jvmArgs.append(" -Xmx").append(tmParams.taskManagerHeapSizeMB()).append("m");
+ if (tmParams.taskManagerDirectMemoryLimitMB() >= 0) {
+ jvmArgs.append(" -XX:MaxDirectMemorySize=").append(tmParams.taskManagerDirectMemoryLimitMB()).append(
+ "m");
+ }
+ } else { // flip49 enabled
+ jvmArgs.append(" ").append(TaskExecutorResourceUtils.generateJvmParametersStr(tmParams.getTaskExecutorResourceSpec()));
}
// pass dynamic system properties
@@ -301,6 +311,9 @@ public class LaunchableMesosWorker implements LaunchableTask {
.append(params.command())
.append(" ")
.append(ContainerSpecification.formatSystemProperties(dynamicProperties));
+ if (tmParams.getTaskExecutorResourceSpec() != null) { // flip49 enabled
+ launchCommand.append(" ").append(TaskExecutorResourceUtils.generateDynamicConfigsStr(tmParams.getTaskExecutorResourceSpec()));
+ }
cmd.setValue(launchCommand.toString());
// build the container info
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
index 1d49000..fcad191 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
@@ -24,6 +24,8 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
import org.apache.flink.util.Preconditions;
import com.netflix.fenzo.ConstraintEvaluator;
@@ -333,9 +335,16 @@ public class MesosTaskManagerParameters {
public static MesosTaskManagerParameters create(Configuration flinkConfig) {
List<ConstraintEvaluator> constraints = parseConstraints(flinkConfig.getString(MESOS_CONSTRAINTS_HARD_HOSTATTR));
+ TaskExecutorResourceSpec taskExecutorResourceSpec;
+ if (flinkConfig.getBoolean(TaskManagerOptions.ENABLE_FLIP_49_CONFIG)) {
+ taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(flinkConfig);
+ } else {
+ taskExecutorResourceSpec = null;
+ }
// parse the common parameters
ContaineredTaskManagerParameters containeredParameters = ContaineredTaskManagerParameters.create(
flinkConfig,
+ taskExecutorResourceSpec,
flinkConfig.getInteger(MESOS_RM_TASKS_MEMORY_MB),
flinkConfig.getInteger(MESOS_RM_TASKS_SLOTS));
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index b2eb177..177bf2f 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -271,7 +271,7 @@ public class MesosResourceManagerTest extends TestLogger {
// TaskExecutor templating
ContainerSpecification containerSpecification = new ContainerSpecification();
ContaineredTaskManagerParameters containeredParams =
- new ContaineredTaskManagerParameters(1024, 768, 256, 4, new HashMap<String, String>());
+ new ContaineredTaskManagerParameters(null, 1024, 768, 256, 4, new HashMap<String, String>());
MesosTaskManagerParameters tmParams = new MesosTaskManagerParameters(
1.0, 1, 0, MesosTaskManagerParameters.ContainerType.MESOS, Option.<String>empty(), containeredParams,
Collections.<Protos.Volume>emptyList(), Collections.<Protos.Parameter>emptyList(), false,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 0b156ca..e18edf3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -395,7 +395,12 @@ public class BootstrapTools {
tmParams.taskManagerDirectMemoryLimitMB()));
}
- startCommandValues.put("jvmmem", StringUtils.join(params, ' '));
+ final TaskExecutorResourceSpec taskExecutorResourceSpec = tmParams.getTaskExecutorResourceSpec();
+ if (taskExecutorResourceSpec == null) { // FLIP-49 disabled
+ startCommandValues.put("jvmmem", StringUtils.join(params, ' '));
+ } else { // FLIP-49 enabled
+ startCommandValues.put("jvmmem", TaskExecutorResourceUtils.generateJvmParametersStr(taskExecutorResourceSpec));
+ }
String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS);
if (flinkConfig.getString(CoreOptions.FLINK_TM_JVM_OPTIONS).length() > 0) {
@@ -427,7 +432,10 @@ public class BootstrapTools {
startCommandValues.put("redirects",
"1> " + logDirectory + "/taskmanager.out " +
"2> " + logDirectory + "/taskmanager.err");
- startCommandValues.put("args", "--configDir " + configDirectory);
+
+ String argsStr = taskExecutorResourceSpec == null ? "" :
+ TaskExecutorResourceUtils.generateDynamicConfigsStr(taskExecutorResourceSpec) + " ";
+ startCommandValues.put("args", argsStr + "--configDir " + configDirectory);
final String commandTemplate = flinkConfig
.getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index a4e7d25..af8b7fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
@@ -48,13 +49,19 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
/** Environment variables to add to the Java process. */
private final HashMap<String, String> taskManagerEnv;
+ @Nullable // should only be null when flip49 is disabled
+ private final TaskExecutorResourceSpec taskExecutorResourceSpec;
+
public ContaineredTaskManagerParameters(
+ @Nullable // should only be null when flip49 is disabled
+ TaskExecutorResourceSpec taskExecutorResourceSpec,
long totalContainerMemoryMB,
long taskManagerHeapSizeMB,
long taskManagerDirectMemoryLimitMB,
int numSlots,
HashMap<String, String> taskManagerEnv) {
+ this.taskExecutorResourceSpec = taskExecutorResourceSpec;
this.totalContainerMemoryMB = totalContainerMemoryMB;
this.taskManagerHeapSizeMB = taskManagerHeapSizeMB;
this.taskManagerDirectMemoryLimitMB = taskManagerDirectMemoryLimitMB;
@@ -64,6 +71,11 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
// ------------------------------------------------------------------------
+ @Nullable // should only be null when flip49 is disabled
+ public TaskExecutorResourceSpec getTaskExecutorResourceSpec() {
+ return taskExecutorResourceSpec;
+ }
+
public long taskManagerTotalMemoryMB() {
return totalContainerMemoryMB;
}
@@ -90,7 +102,8 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
@Override
public String toString() {
return "TaskManagerParameters {" +
- "totalContainerMemory=" + totalContainerMemoryMB +
+ "taskExecutorResourceSpec=" + (taskExecutorResourceSpec == null ? "null" : taskExecutorResourceSpec) +
+ ", totalContainerMemory=" + totalContainerMemoryMB +
", taskManagerHeapSize=" + taskManagerHeapSizeMB +
", taskManagerDirectMemoryLimit=" + taskManagerDirectMemoryLimitMB +
", numSlots=" + numSlots +
@@ -151,6 +164,8 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
*/
public static ContaineredTaskManagerParameters create(
Configuration config,
+ @Nullable // should only be null when flip49 is disabled
+ TaskExecutorResourceSpec taskExecutorResourceSpec,
long containerMemoryMB,
int numSlots) {
// (1) try to compute how much memory used by container
@@ -175,6 +190,6 @@ public class ContaineredTaskManagerParameters implements java.io.Serializable {
// done
return new ContaineredTaskManagerParameters(
- containerMemoryMB, heapSizeMB, offHeapSizeMB, numSlots, envVars);
+ taskExecutorResourceSpec, containerMemoryMB, heapSizeMB, offHeapSizeMB, numSlots, envVars);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
index 0813aea..d6cbe5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorResourceSpec.java
@@ -74,7 +74,7 @@ import org.apache.flink.configuration.MemorySize;
* └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
* </pre>
*/
-public class TaskExecutorResourceSpec {
+public class TaskExecutorResourceSpec implements java.io.Serializable {
private final MemorySize frameworkHeapSize;
@@ -155,4 +155,18 @@ public class TaskExecutorResourceSpec {
public MemorySize getTotalProcessMemorySize() {
return getTotalFlinkMemorySize().add(jvmMetaspaceSize).add(jvmOverheadSize);
}
+
+ @Override
+ public String toString() {
+ return "TaskExecutorResourceSpec {"
+ + "frameworkHeapSize=" + frameworkHeapSize.toString()
+ + ", taskHeapSize=" + taskHeapSize.toString()
+ + ", taskOffHeapSize=" + taskOffHeapSize.toString()
+ + ", shuffleMemSize=" + shuffleMemSize.toString()
+ + ", onHeapManagedMemorySize=" + onHeapManagedMemorySize.toString()
+ + ", offHeapManagedMemorySize=" + offHeapManagedMemorySize.toString()
+ + ", jvmMetaspaceSize=" + jvmMetaspaceSize.toString()
+ + ", jvmOverheadSize=" + jvmOverheadSize.toString()
+ + "}";
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index c89a88d..a2aa9ac 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -27,6 +27,8 @@ import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
@@ -1210,5 +1212,15 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
final ResourceProfile resourceProfile = TaskManagerServices.computeSlotResourceProfile(numSlots, managedMemoryBytes);
return Collections.nCopies(numSlots, resourceProfile);
}
+
+ @Nullable // should only be null when flip49 is disabled
+ public static TaskExecutorResourceSpec createTaskExecutorResourceSpec(Configuration config) {
+ final boolean enableFlip49 = config.getBoolean(TaskManagerOptions.ENABLE_FLIP_49_CONFIG);
+ if (enableFlip49) {
+ return TaskExecutorResourceUtils.resourceSpecFromConfig(config);
+ } else {
+ return null;
+ }
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
index 04ad29c..3cc3168 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/BootstrapToolsTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.clusterframework;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.util.ExceptionUtils;
@@ -143,8 +144,40 @@ public class BootstrapToolsTest extends TestLogger {
@Test
public void testGetTaskManagerShellCommand() {
final Configuration cfg = new Configuration();
+ final TaskExecutorResourceSpec taskExecutorResourceSpec = new TaskExecutorResourceSpec(
+ new MemorySize(0), // frameworkHeapSize
+ new MemorySize(111), // taskHeapSize
+ new MemorySize(0), // taskOffHeapSize
+ new MemorySize(222), // shuffleMemSize
+ new MemorySize(0), // onHeapManagedMemorySize
+ new MemorySize(0), // offHeapManagedMemorySize
+ new MemorySize(333), // jvmMetaspaceSize
+ new MemorySize(0)); // jvmOverheadSize
+ final ContaineredTaskManagerParameters containeredParams = new ContaineredTaskManagerParameters(
+ taskExecutorResourceSpec, 1024, 768, 256, 4, new HashMap<String, String>());
+
+ // no logging, with/out krb5
+ final String java = "$JAVA_HOME/bin/java";
+ final String jvmmem = "-Xmx111 -Xms111 -XX:MaxDirectMemorySize=222 -XX:MaxMetaspaceSize=333";
+ final String mainClass =
+ "org.apache.flink.runtime.clusterframework.BootstrapToolsTest";
+ final String dynamicConfigs = TaskExecutorResourceUtils.generateDynamicConfigsStr(taskExecutorResourceSpec).trim();
+ final String args = dynamicConfigs + " --configDir ./conf";
+ final String redirects =
+ "1> ./logs/taskmanager.out 2> ./logs/taskmanager.err";
+
+ assertEquals(
+ java + " " + jvmmem + " " + mainClass + " " + args + " " + redirects,
+ BootstrapTools
+ .getTaskManagerShellCommand(cfg, containeredParams, "./conf", "./logs",
+ false, false, false, this.getClass()));
+ }
+
+ @Test
+ public void testGetTaskManagerShellCommandLegacy() {
+ final Configuration cfg = new Configuration();
final ContaineredTaskManagerParameters containeredParams =
- new ContaineredTaskManagerParameters(1024, 768, 256, 4,
+ new ContaineredTaskManagerParameters(null, 1024, 768, 256, 4,
new HashMap<String, String>());
// no logging, with/out krb5
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
index a1f4cad..7fe09fd 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParametersTest.java
@@ -45,7 +45,7 @@ public class ContaineredTaskManagerParametersTest extends TestLogger {
Configuration conf = new Configuration();
ContaineredTaskManagerParameters params =
- ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1);
+ ContaineredTaskManagerParameters.create(conf, null, CONTAINER_MEMORY, 1);
final float memoryCutoffRatio = conf.getFloat(
ConfigConstants.CONTAINERIZED_HEAP_CUTOFF_RATIO,
@@ -80,7 +80,7 @@ public class ContaineredTaskManagerParametersTest extends TestLogger {
conf.setBoolean(MEMORY_OFF_HEAP, false);
ContaineredTaskManagerParameters params =
- ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1);
+ ContaineredTaskManagerParameters.create(conf, null, CONTAINER_MEMORY, 1);
assertTrue(params.taskManagerDirectMemoryLimitMB() > 0L);
@@ -98,7 +98,7 @@ public class ContaineredTaskManagerParametersTest extends TestLogger {
conf.setBoolean(MEMORY_OFF_HEAP, true);
ContaineredTaskManagerParameters params =
- ContaineredTaskManagerParameters.create(conf, CONTAINER_MEMORY, 1);
+ ContaineredTaskManagerParameters.create(conf, null, CONTAINER_MEMORY, 1);
assertTrue(params.taskManagerDirectMemoryLimitMB() > 0L);
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 3a3144e..8f74aa1 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -187,7 +187,7 @@ public class UtilsTest extends TestLogger {
hdfsDelegationTokenKind, service));
amCredentials.writeTokenStorageFile(new org.apache.hadoop.fs.Path(credentialFile.getAbsolutePath()), yarnConf);
- ContaineredTaskManagerParameters tmParams = new ContaineredTaskManagerParameters(64,
+ ContaineredTaskManagerParameters tmParams = new ContaineredTaskManagerParameters(null, 64,
64, 16, 1, new HashMap<>(1));
Configuration taskManagerConf = new Configuration();
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 2aa476a..8350293 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -177,6 +177,7 @@ public class YarnConfigurationITCase extends YarnTestBase {
final ContaineredTaskManagerParameters containeredTaskManagerParameters = ContaineredTaskManagerParameters.create(
configuration,
+ null,
taskManagerMemory,
slotsPerTaskManager);
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 3a70086..366a029 100755
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -26,6 +26,8 @@ import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -129,6 +131,9 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
private final Resource resource;
+ @Nullable // should only be null when flip49 is disabled
+ private final TaskExecutorResourceSpec taskExecutorResourceSpec;
+
public YarnResourceManager(
RpcService rpcService,
String resourceManagerEndpointId,
@@ -176,7 +181,14 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
this.webInterfaceUrl = webInterfaceUrl;
this.numberOfTaskSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
- this.defaultTaskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
+
+ this.taskExecutorResourceSpec = createTaskExecutorResourceSpec(flinkConfig);
+ if (taskExecutorResourceSpec != null) { // FLIP-49 enabled
+ final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(flinkConfig);
+ this.defaultTaskManagerMemoryMB = taskExecutorResourceSpec.getTotalProcessMemorySize().getMebiBytes();
+ } else { // FLIP-49 disabled
+ this.defaultTaskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
+ }
this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
this.resource = Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus);
@@ -390,6 +402,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
try {
// Context information used to start a TaskExecutor Java process
ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
+ taskExecutorResourceSpec,
container.getResource(),
containerIdStr,
container.getNodeId().getHost());
@@ -534,20 +547,31 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
RM_REQUEST_PRIORITY);
}
- private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
- throws Exception {
+ private ContainerLaunchContext createTaskExecutorLaunchContext(
+ @Nullable // should only be null when flip49 is disabled
+ TaskExecutorResourceSpec tmResourceSpec,
+ Resource resource,
+ String containerId,
+ String host) throws Exception {
+
// init the ContainerLaunchContext
final String currDir = env.get(ApplicationConstants.Environment.PWD.key());
final ContaineredTaskManagerParameters taskManagerParameters =
- ContaineredTaskManagerParameters.create(flinkConfig, resource.getMemory(), numberOfTaskSlots);
+ ContaineredTaskManagerParameters.create(flinkConfig, tmResourceSpec, resource.getMemory(), numberOfTaskSlots);
- log.debug("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, " +
- "JVM direct memory limit {} MB",
+ if (tmResourceSpec == null) { // FLIP-49 disabled
+ log.debug("TaskExecutor {} will be started with container size {} MB, JVM heap size {} MB, " +
+ "JVM direct memory limit {} MB",
containerId,
taskManagerParameters.taskManagerTotalMemoryMB(),
taskManagerParameters.taskManagerHeapSizeMB(),
taskManagerParameters.taskManagerDirectMemoryLimitMB());
+ } else { // Flip-49 enabled
+ log.debug("TaskExecutor {} will be started with {}.",
+ containerId,
+ tmResourceSpec);
+ }
Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig);