You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2015/04/07 15:36:28 UTC

spark git commit: [SPARK-3591][YARN]fire and forget for YARN cluster mode

Repository: spark
Updated Branches:
  refs/heads/master ae980eb41 -> b65bad65c


[SPARK-3591][YARN]fire and forget for YARN cluster mode

https://issues.apache.org/jira/browse/SPARK-3591

The output after this patch:
>doggie153:/opt/oss/spark-1.3.0-bin-hadoop2.4/bin # ./spark-submit  --class org.apache.spark.examples.SparkPi --master yarn-cluster ../lib/spark-examples*.jar
15/03/31 21:15:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/03/31 21:15:25 INFO RMProxy: Connecting to ResourceManager at doggie153/10.177.112.153:8032
15/03/31 21:15:25 INFO Client: Requesting a new application from cluster with 4 NodeManagers
15/03/31 21:15:25 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)
15/03/31 21:15:25 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
15/03/31 21:15:25 INFO Client: Setting up container launch context for our AM
15/03/31 21:15:25 INFO Client: Preparing resources for our AM container
15/03/31 21:15:26 INFO Client: Uploading resource file:/opt/oss/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.4.0-SNAPSHOT-hadoop2.4.1.jar -> hdfs://doggie153:9000/user/root/.sparkStaging/application_1427257505534_0016/spark-assembly-1.4.0-SNAPSHOT-hadoop2.4.1.jar
15/03/31 21:15:27 INFO Client: Uploading resource file:/opt/oss/spark-1.3.0-bin-hadoop2.4/lib/spark-examples-1.3.0-hadoop2.4.0.jar -> hdfs://doggie153:9000/user/root/.sparkStaging/application_1427257505534_0016/spark-examples-1.3.0-hadoop2.4.0.jar
15/03/31 21:15:28 INFO Client: Setting up the launch environment for our AM container
15/03/31 21:15:28 INFO SecurityManager: Changing view acls to: root
15/03/31 21:15:28 INFO SecurityManager: Changing modify acls to: root
15/03/31 21:15:28 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
15/03/31 21:15:28 INFO Client: Submitting application 16 to ResourceManager
15/03/31 21:15:28 INFO YarnClientImpl: Submitted application application_1427257505534_0016
15/03/31 21:15:28 INFO Client: ... waiting before polling ResourceManager for application state
15/03/31 21:15:33 INFO Client: ... polling ResourceManager for application state
15/03/31 21:15:33 INFO Client: Application report for application_1427257505534_0016 (state: RUNNING)
15/03/31 21:15:33 INFO Client:
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: doggie157
         ApplicationMaster RPC port: 0
         queue: default
         start time: 1427807728307
         final status: UNDEFINED
         tracking URL: http://doggie153:8088/proxy/application_1427257505534_0016/
         user: root

/cc andrewor14

Author: WangTaoTheTonic <wa...@huawei.com>

Closes #5297 from WangTaoTheTonic/SPARK-3591 and squashes the following commits:

c76d232 [WangTaoTheTonic] wrap lines
16c90a8 [WangTaoTheTonic] move up lines to avoid duplicate
fea390d [WangTaoTheTonic] log failed/killed report, style and comment
be1cc2e [WangTaoTheTonic] reword
f0bc54f [WangTaoTheTonic] minor: expose appid in excepiton messages
ba9b22b [WangTaoTheTonic] wrong config name
e1a4013 [WangTaoTheTonic] revert to the old version and do some robust
19706c0 [WangTaoTheTonic] add a config to control whether to forget
0cbdce8 [WangTaoTheTonic] fire and forget for YARN cluster mode


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b65bad65
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b65bad65
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b65bad65

Branch: refs/heads/master
Commit: b65bad65c3500475b974ca0219f218eef296db2c
Parents: ae980eb
Author: WangTaoTheTonic <wa...@huawei.com>
Authored: Tue Apr 7 08:36:25 2015 -0500
Committer: Thomas Graves <tg...@apache.org>
Committed: Tue Apr 7 08:36:25 2015 -0500

----------------------------------------------------------------------
 .../scala/org/apache/spark/deploy/Client.scala  |  2 +-
 .../deploy/rest/StandaloneRestClient.scala      |  2 +-
 docs/running-on-yarn.md                         |  9 +++
 .../org/apache/spark/deploy/yarn/Client.scala   | 83 ++++++++++++--------
 4 files changed, 61 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b65bad65/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 65238af..8d13b2a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -89,7 +89,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
 
   /* Find out driver status then exit the JVM */
   def pollAndReportStatus(driverId: String) {
-    println(s"... waiting before polling master for driver state")
+    println("... waiting before polling master for driver state")
     Thread.sleep(5000)
     println("... polling master for driver state")
     val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)

http://git-wip-us.apache.org/repos/asf/spark/blob/b65bad65/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
index a3539e4..b8fd406 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
@@ -245,7 +245,7 @@ private[deploy] class StandaloneRestClient extends Logging {
       }
     } else {
       val failMessage = Option(submitResponse.message).map { ": " + _ }.getOrElse("")
-      logError("Application submission failed" + failMessage)
+      logError(s"Application submission failed$failMessage")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b65bad65/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index d9f3eb2..b7e68d4 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -196,6 +196,15 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
   It should be no larger than the global number of max attempts in the YARN configuration.
   </td>
 </tr>
+<tr>
+  <td><code>spark.yarn.submit.waitAppCompletion</code></td>
+  <td>true</td>
+  <td>
+  In YARN cluster mode, controls whether the client waits to exit until the application completes.
+  If set to true, the client process will stay alive reporting the application's status.
+  Otherwise, the client process will exit after submission.
+  </td>
+</tr>
 </table>
 
 # Launching Spark on YARN

http://git-wip-us.apache.org/repos/asf/spark/blob/b65bad65/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 61f8fc3..79d55a0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -66,6 +66,8 @@ private[spark] class Client(
   private val executorMemoryOverhead = args.executorMemoryOverhead // MB
   private val distCacheMgr = new ClientDistributedCacheManager()
   private val isClusterMode = args.isClusterMode
+  private val fireAndForget = isClusterMode &&
+    !sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
 
 
   def stop(): Unit = yarnClient.stop()
@@ -564,31 +566,13 @@ private[spark] class Client(
 
       if (logApplicationReport) {
         logInfo(s"Application report for $appId (state: $state)")
-        val details = Seq[(String, String)](
-          ("client token", getClientToken(report)),
-          ("diagnostics", report.getDiagnostics),
-          ("ApplicationMaster host", report.getHost),
-          ("ApplicationMaster RPC port", report.getRpcPort.toString),
-          ("queue", report.getQueue),
-          ("start time", report.getStartTime.toString),
-          ("final status", report.getFinalApplicationStatus.toString),
-          ("tracking URL", report.getTrackingUrl),
-          ("user", report.getUser)
-        )
-
-        // Use more loggable format if value is null or empty
-        val formattedDetails = details
-          .map { case (k, v) =>
-          val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
-          s"\n\t $k: $newValue" }
-          .mkString("")
 
         // If DEBUG is enabled, log report details every iteration
         // Otherwise, log them every time the application changes state
         if (log.isDebugEnabled) {
-          logDebug(formattedDetails)
+          logDebug(formatReportDetails(report))
         } else if (lastState != state) {
-          logInfo(formattedDetails)
+          logInfo(formatReportDetails(report))
         }
       }
 
@@ -609,24 +593,57 @@ private[spark] class Client(
     throw new SparkException("While loop is depleted! This should never happen...")
   }
 
+  private def formatReportDetails(report: ApplicationReport): String = {
+    val details = Seq[(String, String)](
+      ("client token", getClientToken(report)),
+      ("diagnostics", report.getDiagnostics),
+      ("ApplicationMaster host", report.getHost),
+      ("ApplicationMaster RPC port", report.getRpcPort.toString),
+      ("queue", report.getQueue),
+      ("start time", report.getStartTime.toString),
+      ("final status", report.getFinalApplicationStatus.toString),
+      ("tracking URL", report.getTrackingUrl),
+      ("user", report.getUser)
+    )
+
+    // Use more loggable format if value is null or empty
+    details.map { case (k, v) =>
+      val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
+      s"\n\t $k: $newValue"
+    }.mkString("")
+  }
+
   /**
-   * Submit an application to the ResourceManager and monitor its state.
-   * This continues until the application has exited for any reason.
+   * Submit an application to the ResourceManager.
+   * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
+   * reporting the application's status until the application has exited for any reason.
+   * Otherwise, the client process will exit after submission.
    * If the application finishes with a failed, killed, or undefined status,
    * throw an appropriate SparkException.
    */
   def run(): Unit = {
-    val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication())
-    if (yarnApplicationState == YarnApplicationState.FAILED ||
-      finalApplicationStatus == FinalApplicationStatus.FAILED) {
-      throw new SparkException("Application finished with failed status")
-    }
-    if (yarnApplicationState == YarnApplicationState.KILLED ||
-      finalApplicationStatus == FinalApplicationStatus.KILLED) {
-      throw new SparkException("Application is killed")
-    }
-    if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
-      throw new SparkException("The final status of application is undefined")
+    val appId = submitApplication()
+    if (fireAndForget) {
+      val report = getApplicationReport(appId)
+      val state = report.getYarnApplicationState
+      logInfo(s"Application report for $appId (state: $state)")
+      logInfo(formatReportDetails(report))
+      if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
+        throw new SparkException(s"Application $appId finished with status: $state")
+      }
+    } else {
+      val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId)
+      if (yarnApplicationState == YarnApplicationState.FAILED ||
+        finalApplicationStatus == FinalApplicationStatus.FAILED) {
+        throw new SparkException(s"Application $appId finished with failed status")
+      }
+      if (yarnApplicationState == YarnApplicationState.KILLED ||
+        finalApplicationStatus == FinalApplicationStatus.KILLED) {
+        throw new SparkException(s"Application $appId is killed")
+      }
+      if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
+        throw new SparkException(s"The final status of application $appId is undefined")
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org