You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/07/11 18:25:46 UTC

spark git commit: [SPARK-16019][YARN] Use separate RM poll interval when starting client AM.

Repository: spark
Updated Branches:
  refs/heads/master ebc124d4c -> 1cad31f00


[SPARK-16019][YARN] Use separate RM poll interval when starting client AM.

Currently the code monitoring the launch of the client AM uses the value of
spark.yarn.report.interval as the interval for polling the RM; if someone
has that value to a really large interval, it would take that long to detect
that the client AM has started, which is not expected.

Instead, have a separate config for the interval to use when the client AM is
starting. The other config is still used in cluster mode, and to detect the
status of the client AM after it is already running.

Tested by running client and cluster mode apps with a modified value of
spark.yarn.report.interval, verifying client AM launch is detected before
that interval elapses.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #18380 from vanzin/SPARK-16019.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1cad31f0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1cad31f0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1cad31f0

Branch: refs/heads/master
Commit: 1cad31f00644d899d8e74d58c6eb4e9f72065473
Parents: ebc124d
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Tue Jul 11 11:25:40 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Jul 11 11:25:40 2017 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/deploy/yarn/Client.scala  |  6 ++++--
 .../main/scala/org/apache/spark/deploy/yarn/config.scala  | 10 ++++++++--
 .../scheduler/cluster/YarnClientSchedulerBackend.scala    |  6 +++++-
 3 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1cad31f0/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1dd0715..7caaa91 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -986,13 +986,15 @@ private[spark] class Client(
    * @param appId ID of the application to monitor.
    * @param returnOnRunning Whether to also return the application state when it is RUNNING.
    * @param logApplicationReport Whether to log details of the application report every iteration.
+   * @param interval How often to poll the YARN RM for application status (in ms).
    * @return A pair of the yarn application state and the final application state.
    */
   def monitorApplication(
       appId: ApplicationId,
       returnOnRunning: Boolean = false,
-      logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = {
-    val interval = sparkConf.get(REPORT_INTERVAL)
+      logApplicationReport: Boolean = true,
+      interval: Long = sparkConf.get(REPORT_INTERVAL)):
+      (YarnApplicationState, FinalApplicationStatus) = {
     var lastState: YarnApplicationState = null
     while (true) {
       Thread.sleep(interval)

http://git-wip-us.apache.org/repos/asf/spark/blob/1cad31f0/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index d4108ca..187803c 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -127,7 +127,7 @@ package object config {
     .stringConf
     .createOptional
 
-  /* Cluster-mode launcher configuration. */
+  /* Launcher configuration. */
 
   private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")
     .doc("In cluster mode, whether to wait for the application to finish before exiting the " +
@@ -136,10 +136,16 @@ package object config {
     .createWithDefault(true)
 
   private[spark] val REPORT_INTERVAL = ConfigBuilder("spark.yarn.report.interval")
-    .doc("Interval between reports of the current app status in cluster mode.")
+    .doc("Interval between reports of the current app status.")
     .timeConf(TimeUnit.MILLISECONDS)
     .createWithDefaultString("1s")
 
+  private[spark] val CLIENT_LAUNCH_MONITOR_INTERVAL =
+    ConfigBuilder("spark.yarn.clientLaunchMonitorInterval")
+      .doc("Interval between requests for status the client mode AM when starting the app.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("1s")
+
   /* Shared Client-mode AM / Driver configuration. */
 
   private[spark] val AM_MAX_WAIT_TIME = ConfigBuilder("spark.yarn.am.waitTime")

http://git-wip-us.apache.org/repos/asf/spark/blob/1cad31f0/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 60da356..d482376 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState
 
 import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
+import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.launcher.SparkAppHandle
 import org.apache.spark.scheduler.TaskSchedulerImpl
@@ -77,8 +78,11 @@ private[spark] class YarnClientSchedulerBackend(
    * This assumes both `client` and `appId` have already been set.
    */
   private def waitForApplication(): Unit = {
+    val monitorInterval = conf.get(CLIENT_LAUNCH_MONITOR_INTERVAL)
+
     assert(client != null && appId.isDefined, "Application has not been submitted yet!")
-    val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true) // blocking
+    val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true,
+      interval = monitorInterval) // blocking
     if (state == YarnApplicationState.FINISHED ||
       state == YarnApplicationState.FAILED ||
       state == YarnApplicationState.KILLED) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org