You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/07 15:23:42 UTC

[flink] 02/02: [FLINK-12342][yarn] Add config option for heartbeat interval during container requests

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3871d4d2bf19d904252ed2fe8fe7cbad9af2c634
Author: Peter Huang <hp...@uber.com>
AuthorDate: Wed May 1 13:50:08 2019 -0700

    [FLINK-12342][yarn] Add config option for heartbeat interval during container requests
    
    Flink's YarnResourceManager sets a faster heartbeat interval when it is requesting containers
    from Yarn's ResourceManager. Since requests and responses are transported via heartbeats, this
    speeds up requests. However, it can also put additional load on Yarn due to excessive container
    requests. Therefore, this commit introduces a config option which allows to control this heartbeat.
    
    This closes #8306.
---
 .../generated/yarn_config_configuration.html        |  7 ++++++-
 .../org/apache/flink/yarn/YarnResourceManager.java  | 11 +++++------
 .../flink/yarn/configuration/YarnConfigOptions.java | 21 ++++++++++++++++++++-
 3 files changed, 31 insertions(+), 8 deletions(-)

diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html
index ab7e224..a6b04b6 100644
--- a/docs/_includes/generated/yarn_config_configuration.html
+++ b/docs/_includes/generated/yarn_config_configuration.html
@@ -33,7 +33,12 @@
             <td>The number of virtual cores (vcores) per YARN container. By default, the number of vcores is set to the number of slots per TaskManager, if set, or to 1, otherwise. In order for this parameter to be used your cluster must have CPU scheduling enabled. You can do this by setting the <span markdown="span">`org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler`</span>.</td>
         </tr>
         <tr>
-            <td><h5>yarn.heartbeat-delay</h5></td>
+            <td><h5>yarn.heartbeat.container-request-interval</h5></td>
+            <td style="word-wrap: break-word;">500</td>
+            <td>Time between heartbeats with the ResourceManager in milliseconds if Flink requests containers:<ul><li>The lower this value is, the faster Flink will get notified about container allocations since requests and allocations are transmitted via heartbeats.</li><li>The lower this value is, the more excessive containers might get allocated which will eventually be released but put pressure on Yarn.</li></ul>If you observe too many container allocations on the ResourceManager, t [...]
+        </tr>
+        <tr>
+            <td><h5>yarn.heartbeat.interval</h5></td>
             <td style="word-wrap: break-word;">5</td>
             <td>Time between heartbeats with the ResourceManager in seconds.</td>
         </tr>
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 375c196..d054afe 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -86,10 +86,6 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 	/** YARN container map. Package private for unit test purposes. */
 	private final ConcurrentMap<ResourceID, YarnWorkerNode> workerNodeMap;
-
-	/** The heartbeat interval while the resource master is waiting for containers. */
-	private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
-
 	/** Environment variable name of the final container id used by the YarnResourceManager.
 	 * Container ID generation may vary across Hadoop versions. */
 	static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
@@ -114,6 +110,9 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 
 	private final int defaultCpus;
 
+	/** The heartbeat interval while the resource master is waiting for containers. */
+	private final int containerRequestHeartbeatIntervalMillis;
+
 	/** Client to communicate with the Resource Manager (YARN's master). */
 	private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;
 
@@ -173,6 +172,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 					yarnHeartbeatIntervalMS, yarnExpiryIntervalMS);
 		}
 		yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
+		containerRequestHeartbeatIntervalMillis = flinkConfig.getInteger(YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
 		numPendingContainerRequests = 0;
 
 		this.webInterfaceUrl = webInterfaceUrl;
@@ -514,8 +514,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		resourceManagerClient.addContainerRequest(getContainerRequest());
 
 		// make sure we transmit the request fast and receive fast news of granted allocations
-		resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
-
+		resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
 		numPendingContainerRequests++;
 
 		log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 6abeb0d..40a5929 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -22,7 +22,9 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.description.Description;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.LinkElement.link;
 import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
 
 /**
  * This class holds configuration constants used by Flink's YARN runners.
@@ -103,11 +105,28 @@ public class YarnConfigOptions {
 	 * The heartbeat interval between the Application Master and the YARN Resource Manager.
 	 */
 	public static final ConfigOption<Integer> HEARTBEAT_DELAY_SECONDS =
-		key("yarn.heartbeat-delay")
+		key("yarn.heartbeat.interval")
 		.defaultValue(5)
+		.withDeprecatedKeys("yarn.heartbeat-delay")
 		.withDescription("Time between heartbeats with the ResourceManager in seconds.");
 
 	/**
+	 * The heartbeat interval between the Application Master and the YARN Resource Manager
+	 * if Flink is requesting containers.
+	 */
+	public static final ConfigOption<Integer> CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS =
+		key("yarn.heartbeat.container-request-interval")
+			.defaultValue(500)
+			.withDescription(
+				new Description.DescriptionBuilder()
+					.text("Time between heartbeats with the ResourceManager in milliseconds if Flink requests containers:")
+					.list(
+						text("The lower this value is, the faster Flink will get notified about container allocations since requests and allocations are transmitted via heartbeats."),
+						text("The lower this value is, the more excessive containers might get allocated which will eventually be released but put pressure on Yarn."))
+					.text("If you observe too many container allocations on the ResourceManager, then it is recommended to increase this value. See %s for more information.", link("https://issues.apache.org/jira/browse/YARN-1902", "this link"))
+					.build());
+
+	/**
 	 * When a Flink job is submitted to YARN, the JobManager's host and the number of available
 	 * processing slots is written into a properties file, so that the Flink client is able
 	 * to pick those details up.