You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2017/07/28 14:23:17 UTC

spark git commit: [SPARK-21541][YARN] Spark Logs show incorrect job status for a job that does not create SparkContext

Repository: spark
Updated Branches:
  refs/heads/master 784680903 -> 69ab0e4bd


[SPARK-21541][YARN] Spark Logs show incorrect job status for a job that does not create SparkContext

If you run a spark job without creating the SparkSession or SparkContext, the spark job logs says it succeeded but yarn says it fails and retries 3 times. Also, since, Application Master unregisters with Resource Manager and exits successfully, it deletes the spark staging directory, so when yarn makes subsequent retries, it fails to find the staging directory and thus, the retries fail.

Added a flag to check whether user has initialized SparkContext. If it is true, we let Application Master unregister with Resource Manager else, we do not let AM unregister with RM.

## How was this patch tested?
Manually tested the fix.
Before:
<img width="1253" alt="screen shot-before" src="https://user-images.githubusercontent.com/22228190/28647214-69bf81e2-722b-11e7-9ed0-d416d2bf23be.png">

After:
<img width="1319" alt="screen shot-after" src="https://user-images.githubusercontent.com/22228190/28647220-70f9eea2-722b-11e7-85c6-e56276b15614.png">

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: pgandhi <pg...@yahoo-inc.com>
Author: pgandhi999 <pa...@gmail.com>

Closes #18741 from pgandhi999/SPARK-21541.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/69ab0e4b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/69ab0e4b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/69ab0e4b

Branch: refs/heads/master
Commit: 69ab0e4bddccb461f960fcb48a390a1517e504dd
Parents: 7846809
Author: pgandhi <pg...@yahoo-inc.com>
Authored: Fri Jul 28 09:23:08 2017 -0500
Committer: Tom Graves <tg...@yahoo-inc.com>
Committed: Fri Jul 28 09:23:08 2017 -0500

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   | 21 +++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/69ab0e4b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index fc92502..ca6a3ef 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -90,6 +90,9 @@ private[spark] class ApplicationMaster(
   @volatile private var reporterThread: Thread = _
   @volatile private var allocator: YarnAllocator = _
 
+  // A flag to check whether user has initialized spark context
+  @volatile private var registered = false
+
   private val userClassLoader = {
     val classpath = Client.getUserClasspath(sparkConf)
     val urls = classpath.map { entry =>
@@ -319,7 +322,7 @@ private[spark] class ApplicationMaster(
    */
   final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = {
     synchronized {
-      if (!unregistered) {
+      if (registered && !unregistered) {
         logInfo(s"Unregistering ApplicationMaster with $status" +
           Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
         unregistered = true
@@ -332,10 +335,15 @@ private[spark] class ApplicationMaster(
     synchronized {
       if (!finished) {
         val inShutdown = ShutdownHookManager.inShutdown()
-        logInfo(s"Final app status: $status, exitCode: $code" +
+        if (registered) {
+          exitCode = code
+          finalStatus = status
+        } else {
+          finalStatus = FinalApplicationStatus.FAILED
+          exitCode = ApplicationMaster.EXIT_SC_NOT_INITED
+        }
+        logInfo(s"Final app status: $finalStatus, exitCode: $exitCode" +
           Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
-        exitCode = code
-        finalStatus = status
         finalMsg = msg
         finished = true
         if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) {
@@ -439,12 +447,11 @@ private[spark] class ApplicationMaster(
           sc.getConf.get("spark.driver.port"),
           isClusterMode = true)
         registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
+        registered = true
       } else {
         // Sanity check; should never happen in normal operation, since sc should only be null
         // if the user app did not create a SparkContext.
-        if (!finished) {
-          throw new IllegalStateException("SparkContext is null but app is still running!")
-        }
+        throw new IllegalStateException("User did not initialize spark context!")
       }
       userClassThread.join()
     } catch {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org