You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/26 05:37:26 UTC

[flink] 01/02: [FLINK-12152] Make the vcore that Application Master used configurable for Flink on YARN

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4f558e4f225195fefe61718ef09b989f395987ba
Author: yanghua <ya...@gmail.com>
AuthorDate: Fri May 24 15:58:13 2019 +0800

    [FLINK-12152] Make the vcore that Application Master used configurable for Flink on YARN
    
    This closes #8438.
---
 docs/_includes/generated/yarn_config_configuration.html        |  5 +++++
 .../org/apache/flink/yarn/AbstractYarnClusterDescriptor.java   | 10 +++++++++-
 .../org/apache/flink/yarn/configuration/YarnConfigOptions.java |  8 ++++++++
 3 files changed, 22 insertions(+), 1 deletion(-)

diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html
index 40dfc09..6615c32 100644
--- a/docs/_includes/generated/yarn_config_configuration.html
+++ b/docs/_includes/generated/yarn_config_configuration.html
@@ -33,6 +33,11 @@
             <td>The port where the application master RPC system is listening.</td>
         </tr>
         <tr>
+            <td><h5>yarn.appmaster.vcores</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>The number of virtual cores (vcores) used by YARN application master.</td>
+        </tr>
+        <tr>
             <td><h5>yarn.containers.vcores</h5></td>
             <td style="word-wrap: break-word;">-1</td>
             <td>The number of virtual cores (vcores) per YARN container. By default, the number of vcores is set to the number of slots per TaskManager, if set, or to 1, otherwise. In order for this parameter to be used your cluster must have CPU scheduling enabled. You can do this by setting the <span markdown="span">`org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler`</span>.</td>
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 0f24496..3135ecf 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -242,6 +242,14 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			throw new YarnDeploymentException("Couldn't get cluster description, please check on the YarnConfiguration", e);
 		}
 
+		int configuredAmVcores = flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES);
+		if (configuredAmVcores > numYarnMaxVcores) {
+			throw new IllegalConfigurationException(
+				String.format("The number of requested virtual cores for application master %d" +
+						" exceeds the maximum number of virtual cores %d available in the Yarn Cluster.",
+					configuredAmVcores, numYarnMaxVcores));
+		}
+
 		int configuredVcores = flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
 		// don't configure more than the maximum configured number of vcores
 		if (configuredVcores > numYarnMaxVcores) {
@@ -971,7 +979,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		// Set up resource type requirements for ApplicationMaster
 		Resource capability = Records.newRecord(Resource.class);
 		capability.setMemory(clusterSpecification.getMasterMemoryMB());
-		capability.setVirtualCores(1);
+		capability.setVirtualCores(flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));
 
 		final String customApplicationName = customName != null ? customName : applicationName;
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 0f46a57..c0b6cfe 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -50,6 +50,14 @@ public class YarnConfigOptions {
 			.withDescription("The port where the application master RPC system is listening.");
 
 	/**
+	 * The vcores used by YARN application master.
+	 */
+	public static final ConfigOption<Integer> APP_MASTER_VCORES =
+		key("yarn.appmaster.vcores")
+		.defaultValue(1)
+		.withDescription("The number of virtual cores (vcores) used by YARN application master.");
+
+	/**
 	 * Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning
 	 * in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on
 	 * their name ("ORDER").