You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tgravescs <gi...@git.apache.org> on 2014/09/29 17:15:44 UTC

[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

GitHub user tgravescs opened a pull request:

    https://github.com/apache/spark/pull/2577

    [SPARK-3627] - [yarn] - fix exit code and final status reporting to RM

    See the description and whats handled in the jira comment: https://issues.apache.org/jira/browse/SPARK-3627?focusedCommentId=14150013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14150013
    
    This does not handle yarn client mode reporting of the driver to the AM.   I think that should be handled when we make it an unmanaged AM.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tgravescs/spark SPARK-3627

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2577.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2577
    
----
commit d3cc80050dfb472c426d1c1399edd9134e53a563
Author: Thomas Graves <tg...@apache.org>
Date:   2014-09-29T13:38:43Z

    SPARK-3627 - yarn - fix exit code and final status reporting to RM

commit f0b65199f50bae99aa89ea3a5915b610d7134392
Author: Thomas Graves <tg...@apache.org>
Date:   2014-09-29T15:05:29Z

    change order of cleanup staging dir

commit 32f4dfa5d0778e3e7bd7c5da27c71f8c3f5930f6
Author: Thomas Graves <tg...@apache.org>
Date:   2014-09-29T15:10:09Z

    switch back

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18238861
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -450,6 +511,15 @@ object ApplicationMaster extends Logging {
     
       val SHUTDOWN_HOOK_PRIORITY: Int = 30
     
    +  // exit codes for different causes, no reason behind the values
    +  val EXIT_SUCCESS = 0
    +  val EXIT_UNCAUGHT_EXCEPTION = 10
    +  val EXIT_MAX_EXECUTOR_FAILURES = 11
    +  val EXIT_REPORTER_FAILURE = 12
    +  val EXIT_SC_NOT_INITED = 13
    +  val EXIT_SECURITY = 14
    +  val EXIT_EXCEPTION_USER_CLASS = 15
    +
    --- End diff --
    
    These should all be private


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57808155
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21248/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57857279
  
    Hey @tgravescs this LGTM pending a few minor comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57325372
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21040/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18238499
  
    --- Diff: yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala ---
    @@ -66,14 +70,16 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
         appAttemptId
       }
     
    -  override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") = {
    -    val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
    -      .asInstanceOf[FinishApplicationMasterRequest]
    -    finishReq.setAppAttemptId(getAttemptId())
    -    finishReq.setFinishApplicationStatus(status)
    -    finishReq.setDiagnostics(diagnostics)
    -    finishReq.setTrackingUrl(uiHistoryAddress)
    -    resourceManager.finishApplicationMaster(finishReq)
    +  override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
    +    if (registered) {
    +      val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
    +        .asInstanceOf[FinishApplicationMasterRequest]
    --- End diff --
    
    You probably don't need this cast


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57187881
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20975/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18240353
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -328,10 +348,18 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private def waitForSparkDriver(): ActorRef = {
         logInfo("Waiting for Spark driver to be reachable.")
         var driverUp = false
    +    var count = 0
         val hostport = args.userArgs(0)
         val (driverHost, driverPort) = Utils.parseHostPort(hostport)
    -    while (!driverUp) {
    +
    +    // spark driver should already be up since it launched us, but we don't want to
    +    // wait forever, so wait 100 seconds max to match the cluster mode setting.
    +    // Leave this config unpublished for now.
    +    val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.client.waitTries", 1000)
    --- End diff --
    
    This config should use camel case for `applicationMaster`. Also, there's already a `spark.yarn.applicationMaster.waitTries`. Does the extra `client` mean it's for client mode? Do we want a separate setting for client vs deploy modes here?
    
    By the way there is a mismatch between what is already there `spark.yarn.ApplicationMatser.waitTries` and what we document `spark.yarn.applicationMaster.waitTries`. I think this is a bug that we can fix separately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-58026499
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21330/consoleFull) for   PR 2577 at commit [`9c2efbf`](https://github.com/apache/spark/commit/9c2efbfd8d199bf89f911e44c7b07c6afe6b15bd).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18418623
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -383,40 +405,82 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
         }
       }
     
    +  /**
    +   * This system security manager applies to the entire process.
    +   * It's main purpose is to handle the case if the user code does a System.exit.
    +   * This allows us to catch that and properly set the YARN application status and
    +   * cleanup if needed.
    +   */
    +  private def setupSystemSecurityManager() = {
    +    try {
    +      var stopped = false
    +      System.setSecurityManager(new java.lang.SecurityManager() {
    +        override def checkExit(paramInt: Int) {
    +          if (!stopped) {
    +            logInfo("In securityManager checkExit, exit code: " + paramInt)
    +            if (paramInt == 0) {
    +              finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
    +            } else {
    +              finish(FinalApplicationStatus.FAILED,
    +                paramInt,
    +                "User class exited with non-zero exit code")
    +            }
    +            stopped = true
    +          }
    +        }
    +        // required for the checkExit to work properly
    +        override def checkPermission(perm: java.security.Permission): Unit = {
    +        }
    --- End diff --
    
    can you bump this up one line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18174440
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -232,32 +285,27 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
           override def run() {
             var failureCount = 0
     
    -        while (!finished) {
    +        while (!finished && !Thread.currentThread().isInterrupted()) {
               try {
    -            checkNumExecutorsFailed()
    -            if (!finished) {
    +
    --- End diff --
    
    nit: blank line not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57347769
  
    LGTM. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18238788
  
    --- Diff: yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala ---
    @@ -66,14 +70,16 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
         appAttemptId
       }
     
    -  override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") = {
    -    val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
    -      .asInstanceOf[FinishApplicationMasterRequest]
    -    finishReq.setAppAttemptId(getAttemptId())
    -    finishReq.setFinishApplicationStatus(status)
    -    finishReq.setDiagnostics(diagnostics)
    -    finishReq.setTrackingUrl(uiHistoryAddress)
    -    resourceManager.finishApplicationMaster(finishReq)
    +  override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
    +    if (registered) {
    +      val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
    +        .asInstanceOf[FinishApplicationMasterRequest]
    --- End diff --
    
    this pr didn't change this code, other then wrapping it with an if.  Its also going to be deprecated soon so I don't see a reason to fix  it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18249218
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -383,40 +404,80 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
         }
       }
     
    +  /**
    +   * Start the user class, which contains the spark driver.
    +   * If the main routine exits cleanly or exits with System.exit(0) we
    +   * assume it was successful, for all other cases we assume failure.
    +   */
       private def startUserClass(): Thread = {
         logInfo("Starting the user JAR in a separate Thread")
         System.setProperty("spark.executor.instances", args.numExecutors.toString)
    +    var stopped = false
         val mainMethod = Class.forName(args.userClass, false,
           Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
     
    -    userClassThread = new Thread {
    +    val userThread = new Thread {
           override def run() {
    -        var status = FinalApplicationStatus.FAILED
    +
    +        try {
    +          // Note this security manager applies to the entire process, not
    +          // just this thread. It's here to handle the case if the user code
    +          // does System.exit
    +          System.setSecurityManager(new java.lang.SecurityManager() {
    +            override def checkExit(paramInt: Int) {
    +              if (!stopped) {
    +                logInfo("In securityManager checkExit, exit code: " + paramInt)
    +                if (paramInt == 0) {
    +                  finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
    +                } else {
    +                  finish(FinalApplicationStatus.FAILED,
    +                    paramInt,
    +                    "User class exited with non-zero exit code")
    +                }
    +                stopped = true
    +              }
    +            }
    +
    +            // required for the checkExit to work properly
    +            override def checkPermission(perm: java.security.Permission): Unit = {
    +            }
    +          })
    +        }
    +        catch {
    +          case e: SecurityException => {
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_SECURITY,
    +              "Error in setSecurityManager")
    +            logError("Error in setSecurityManager:", e)
    +          }
    +        }
    +
    --- End diff --
    
    Not a big deal, but I think it'll make the content of this thread easier to read by minimizing the logic we put in it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18240375
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -343,6 +371,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
               Thread.sleep(100)
           }
         }
    +
    +    if (!driverUp) {
    +      throw new Exception("Failed to connect to driver!")
    --- End diff --
    
    Can you throw `SparkException` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18240121
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -383,40 +404,80 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
         }
       }
     
    +  /**
    +   * Start the user class, which contains the spark driver.
    --- End diff --
    
    Thanks for documenting this. Can you add that this is started in a separate thread and this method returns that thread?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18239465
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -383,40 +404,80 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
         }
       }
     
    +  /**
    +   * Start the user class, which contains the spark driver.
    +   * If the main routine exits cleanly or exits with System.exit(0) we
    +   * assume it was successful, for all other cases we assume failure.
    +   */
       private def startUserClass(): Thread = {
         logInfo("Starting the user JAR in a separate Thread")
         System.setProperty("spark.executor.instances", args.numExecutors.toString)
    +    var stopped = false
         val mainMethod = Class.forName(args.userClass, false,
           Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
     
    -    userClassThread = new Thread {
    +    val userThread = new Thread {
           override def run() {
    -        var status = FinalApplicationStatus.FAILED
    +
    +        try {
    +          // Note this security manager applies to the entire process, not
    +          // just this thread. It's here to handle the case if the user code
    +          // does System.exit
    +          System.setSecurityManager(new java.lang.SecurityManager() {
    +            override def checkExit(paramInt: Int) {
    +              if (!stopped) {
    +                logInfo("In securityManager checkExit, exit code: " + paramInt)
    +                if (paramInt == 0) {
    +                  finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
    +                } else {
    +                  finish(FinalApplicationStatus.FAILED,
    +                    paramInt,
    +                    "User class exited with non-zero exit code")
    +                }
    +                stopped = true
    +              }
    +            }
    +
    +            // required for the checkExit to work properly
    +            override def checkPermission(perm: java.security.Permission): Unit = {
    +            }
    +          })
    +        }
    +        catch {
    +          case e: SecurityException => {
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_SECURITY,
    +              "Error in setSecurityManager")
    +            logError("Error in setSecurityManager:", e)
    +          }
    +        }
    +
    --- End diff --
    
    Does this block have to execute inside the thread, since it's a system-wide setting?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18248626
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -71,80 +74,108 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private val sparkContextRef = new AtomicReference[SparkContext](null)
     
       final def run(): Int = {
    -    val appAttemptId = client.getAttemptId()
    +    try {
    +      val appAttemptId = client.getAttemptId()
     
    -    if (isDriver) {
    -      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    -      // other spark processes running on the same box
    -      System.setProperty("spark.ui.port", "0")
    +      if (isDriver) {
    +        // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +        // other spark processes running on the same box
    +        System.setProperty("spark.ui.port", "0")
     
    -      // Set the master property to match the requested mode.
    -      System.setProperty("spark.master", "yarn-cluster")
    +        // Set the master property to match the requested mode.
    +        System.setProperty("spark.master", "yarn-cluster")
     
    -      // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    -      System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    -    }
    +        // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    +        System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    +      }
     
    -    logInfo("ApplicationAttemptId: " + appAttemptId)
    +      logInfo("ApplicationAttemptId: " + appAttemptId)
     
    -    val cleanupHook = new Runnable {
    -      override def run() {
    -        // If the SparkContext is still registered, shut it down as a best case effort in case
    -        // users do not call sc.stop or do System.exit().
    -        val sc = sparkContextRef.get()
    -        if (sc != null) {
    -          logInfo("Invoking sc stop from shutdown hook")
    -          sc.stop()
    -          finish(FinalApplicationStatus.SUCCEEDED)
    -        }
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          // If the SparkContext is still registered, shut it down as a best case effort in case
    +          // users do not call sc.stop or do System.exit().
    +          val sc = sparkContextRef.get()
    +          if (sc != null) {
    +            logInfo("Invoking sc stop from shutdown hook")
    +            sc.stop()
    +          }
    +          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +          val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +
    +          if (!finished) {
    +            // this shouldn't ever happen, but if it does assume weird failure
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +              "shutdown hook called without cleanly finishing")
    +          }
     
    -        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    -        // running the AM.
    -        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    -        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    -        if (finished || isLastAttempt) {
    -          cleanupStagingDir()
    +          if (!unregistered) {
    +            // we only want to unregister if we don't want the RM to retry
    +            if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
    +              unregister(finalStatus, finalMsg)
    +              cleanupStagingDir()
    +            }
    +          }
             }
           }
    -    }
     
    -    // Use higher priority than FileSystem.
    -    assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    -    ShutdownHookManager
    -      .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
    +      // Use higher priority than FileSystem.
    +      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    +      ShutdownHookManager
    +        .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
     
    -    // Call this to force generation of secret so it gets populated into the
    -    // Hadoop UGI. This has to happen before the startUserClass which does a
    -    // doAs in order for the credentials to be passed on to the executor containers.
    -    val securityMgr = new SecurityManager(sparkConf)
    +      // Call this to force generation of secret so it gets populated into the
    +      // Hadoop UGI. This has to happen before the startUserClass which does a
    +      // doAs in order for the credentials to be passed on to the executor containers.
    +      val securityMgr = new SecurityManager(sparkConf)
     
    -    if (isDriver) {
    -      runDriver(securityMgr)
    -    } else {
    -      runExecutorLauncher(securityMgr)
    +      if (isDriver) {
    +        runDriver(securityMgr)
    +      } else {
    +        runExecutorLauncher(securityMgr)
    +      }
    +    } catch {
    +      case e: Throwable => {
    +        // catch everything else if not specifically handled
    +        logError("Uncaught exception: ", e)
    +        finish(FinalApplicationStatus.FAILED,
    +          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +          "Uncaught exception: " + e.getMessage())
    +      }
    --- End diff --
    
    I see, though this kind of assumes that there's not much code to execute after this catch, which  happens to be true in our case but not always. I think it makes sense to just catch `Exception`s here and let `Throwable`s surface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57187866
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20975/consoleFull) for   PR 2577 at commit [`32f4dfa`](https://github.com/apache/spark/commit/32f4dfa5d0778e3e7bd7c5da27c71f8c3f5930f6).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57807655
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21248/consoleFull) for   PR 2577 at commit [`24c98e3`](https://github.com/apache/spark/commit/24c98e3154ee2ee93dc4c958ac982b534a798972).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18249133
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -328,10 +348,18 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private def waitForSparkDriver(): ActorRef = {
         logInfo("Waiting for Spark driver to be reachable.")
         var driverUp = false
    +    var count = 0
         val hostport = args.userArgs(0)
         val (driverHost, driverPort) = Utils.parseHostPort(hostport)
    -    while (!driverUp) {
    +
    +    // spark driver should already be up since it launched us, but we don't want to
    +    // wait forever, so wait 100 seconds max to match the cluster mode setting.
    +    // Leave this config unpublished for now.
    +    val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.client.waitTries", 1000)
    --- End diff --
    
    It's kind of inconsistent to use `applicationMaster.client.waitTries` for client mode but `applicationMaster.waitTries` for cluster mode, and the existing documentation for the latter makes no mention of cluster mode even though it's only used there. It's fine to keep the `client` config here but we should make the other one `applicationMaster.cluster.waitTries` in a future JIRA and deprecate the less specific one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18183146
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -71,80 +74,134 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private val sparkContextRef = new AtomicReference[SparkContext](null)
     
       final def run(): Int = {
    -    val appAttemptId = client.getAttemptId()
    +    try {
    +      val appAttemptId = client.getAttemptId()
     
    -    if (isDriver) {
    -      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    -      // other spark processes running on the same box
    -      System.setProperty("spark.ui.port", "0")
    +      if (isDriver) {
    +        // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +        // other spark processes running on the same box
    +        System.setProperty("spark.ui.port", "0")
     
    -      // Set the master property to match the requested mode.
    -      System.setProperty("spark.master", "yarn-cluster")
    +        // Set the master property to match the requested mode.
    +        System.setProperty("spark.master", "yarn-cluster")
     
    -      // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    -      System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    -    }
    +        // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    +        System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    +      }
     
    -    logInfo("ApplicationAttemptId: " + appAttemptId)
    +      logInfo("ApplicationAttemptId: " + appAttemptId)
     
    -    val cleanupHook = new Runnable {
    -      override def run() {
    -        // If the SparkContext is still registered, shut it down as a best case effort in case
    -        // users do not call sc.stop or do System.exit().
    -        val sc = sparkContextRef.get()
    -        if (sc != null) {
    -          logInfo("Invoking sc stop from shutdown hook")
    -          sc.stop()
    -          finish(FinalApplicationStatus.SUCCEEDED)
    -        }
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          // If the SparkContext is still registered, shut it down as a best case effort in case
    +          // users do not call sc.stop or do System.exit().
    +          val sc = sparkContextRef.get()
    +          if (sc != null) {
    +            logInfo("Invoking sc stop from shutdown hook")
    +            sc.stop()
    +          }
    +          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +          val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +
    +          if (!finished) {
    +            // this shouldn't ever happen, but if it does assume weird failure
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +              "shutdown hook called without cleanly finishing")
    +          }
     
    -        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    -        // running the AM.
    -        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    -        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    -        if (finished || isLastAttempt) {
    -          cleanupStagingDir()
    +          if (!unregistered) {
    +            // we only want to unregister if we don't want the RM to retry
    +            if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
    +              unregister(finalStatus, finalMsg)
    +              cleanupStagingDir()
    +            }
    +          }
             }
           }
    -    }
     
    -    // Use higher priority than FileSystem.
    -    assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    -    ShutdownHookManager
    -      .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
    +      // Use higher priority than FileSystem.
    +      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    +      ShutdownHookManager
    +        .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
     
    -    // Call this to force generation of secret so it gets populated into the
    -    // Hadoop UGI. This has to happen before the startUserClass which does a
    -    // doAs in order for the credentials to be passed on to the executor containers.
    -    val securityMgr = new SecurityManager(sparkConf)
    +      // Call this to force generation of secret so it gets populated into the
    +      // Hadoop UGI. This has to happen before the startUserClass which does a
    +      // doAs in order for the credentials to be passed on to the executor containers.
    +      val securityMgr = new SecurityManager(sparkConf)
     
    -    if (isDriver) {
    -      runDriver(securityMgr)
    -    } else {
    -      runExecutorLauncher(securityMgr)
    +      if (isDriver) {
    +        runDriver(securityMgr)
    +      } else {
    +        runExecutorLauncher(securityMgr)
    +      }
    +    } catch {
    +      case e: Throwable => {
    +        // catch everything else if not specifically handled
    +        logError("Uncaught exception: ", e)
    +        finish(FinalApplicationStatus.FAILED,
    +          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +          "Uncaught exception: " + e.getMessage())
    +      }
         }
    +    exitCode
    +  }
     
    -    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    -      finish(finalStatus)
    -      0
    -    } else {
    -      1
    +  /**
    +   * unregister is used to completely unregister the application from the ResourceManager.
    +   * This means the ResourceManager will not retry the application attempt on your behalf if
    +   * a failure occurred.
    +   */
    +  final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!unregistered) {
    +      logInfo(s"Unregistering ApplicationMaster with $status" +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      unregistered = true
    +      client.unregister(status, Option(diagnostics).getOrElse(""))
         }
       }
     
    -  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +  final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized {
         if (!finished) {
    -      logInfo(s"Finishing ApplicationMaster with $status"  +
    -        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    -      finished = true
    +      logInfo(s"Final app status: ${status}, exitCode: ${code}" +
    +        Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
    +      exitCode = code
           finalStatus = status
    -      try {
    -        if (Thread.currentThread() != reporterThread) {
    +      finalMsg = msg
    +      finished = true
    +      if (Thread.currentThread() != reporterThread && Option(reporterThread).isDefined) {
    +        logDebug("shutting down reporter thread")
    +        try {
               reporterThread.interrupt()
    -          reporterThread.join()
    +        } catch {
    +          case e => {
    +            logError("Exception trying to shutdown report thread", e)
    +            // since main thread (client mode) could be waiting on this to finish
    +            // just exit here
    +            System.exit(ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION)
    +          }
    +        }
    +      }
    +      if (Thread.currentThread() != userClassThread && Option(userClassThread).isDefined) {
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from finish")
    +          sc.stop()
    --- End diff --
    
    I'm just wondering what will be the side-effects on user code if the context is stopped before the code expects it to. In the end everything will fail anyway, but maybe telling the user code to shut down "nicely" first is better?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18240067
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -383,40 +404,80 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
         }
       }
     
    +  /**
    +   * Start the user class, which contains the spark driver.
    +   * If the main routine exits cleanly or exits with System.exit(0) we
    +   * assume it was successful, for all other cases we assume failure.
    +   */
       private def startUserClass(): Thread = {
         logInfo("Starting the user JAR in a separate Thread")
         System.setProperty("spark.executor.instances", args.numExecutors.toString)
    +    var stopped = false
         val mainMethod = Class.forName(args.userClass, false,
           Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
     
    -    userClassThread = new Thread {
    +    val userThread = new Thread {
           override def run() {
    -        var status = FinalApplicationStatus.FAILED
    +
    +        try {
    +          // Note this security manager applies to the entire process, not
    +          // just this thread. It's here to handle the case if the user code
    +          // does System.exit
    +          System.setSecurityManager(new java.lang.SecurityManager() {
    +            override def checkExit(paramInt: Int) {
    +              if (!stopped) {
    +                logInfo("In securityManager checkExit, exit code: " + paramInt)
    +                if (paramInt == 0) {
    +                  finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
    +                } else {
    +                  finish(FinalApplicationStatus.FAILED,
    +                    paramInt,
    +                    "User class exited with non-zero exit code")
    +                }
    +                stopped = true
    +              }
    +            }
    +
    +            // required for the checkExit to work properly
    +            override def checkPermission(perm: java.security.Permission): Unit = {
    +            }
    --- End diff --
    
    can you bump this up one line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18459132
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -383,40 +405,82 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
         }
       }
     
    +  /**
    +   * This system security manager applies to the entire process.
    +   * It's main purpose is to handle the case if the user code does a System.exit.
    +   * This allows us to catch that and properly set the YARN application status and
    +   * cleanup if needed.
    +   */
    +  private def setupSystemSecurityManager() = {
    +    try {
    +      var stopped = false
    +      System.setSecurityManager(new java.lang.SecurityManager() {
    +        override def checkExit(paramInt: Int) {
    +          if (!stopped) {
    +            logInfo("In securityManager checkExit, exit code: " + paramInt)
    +            if (paramInt == 0) {
    +              finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
    +            } else {
    +              finish(FinalApplicationStatus.FAILED,
    +                paramInt,
    +                "User class exited with non-zero exit code")
    +            }
    +            stopped = true
    +          }
    +        }
    +        // required for the checkExit to work properly
    +        override def checkPermission(perm: java.security.Permission): Unit = {
    +        }
    +      })
    +    }
    +    catch {
    +      case e: SecurityException =>
    +        finish(FinalApplicationStatus.FAILED,
    +          ApplicationMaster.EXIT_SECURITY,
    +          "Error in setSecurityManager")
    +        logError("Error in setSecurityManager:", e)
    +    }
    +  }
    +
    +  /**
    +   * Start the user class, which contains the spark driver, in a separate Thread.
    +   * If the main routine exits cleanly or exits with System.exit(0) we
    +   * assume it was successful, for all other cases we assume failure.
    +   *
    +   * Returns the user thread that was started.
    +   */
       private def startUserClass(): Thread = {
         logInfo("Starting the user JAR in a separate Thread")
         System.setProperty("spark.executor.instances", args.numExecutors.toString)
         val mainMethod = Class.forName(args.userClass, false,
           Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
     
    -    userClassThread = new Thread {
    +    val userThread = new Thread {
           override def run() {
    -        var status = FinalApplicationStatus.FAILED
             try {
    -          // Copy
               val mainArgs = new Array[String](args.userArgs.size)
               args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
               mainMethod.invoke(null, mainArgs)
    -          // Some apps have "System.exit(0)" at the end.  The user thread will stop here unless
    -          // it has an uncaught exception thrown out.  It needs a shutdown hook to set SUCCEEDED.
    -          status = FinalApplicationStatus.SUCCEEDED
    +          finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
    +          logDebug("Done running users class")
             } catch {
               case e: InvocationTargetException =>
                 e.getCause match {
                   case _: InterruptedException =>
                     // Reporter thread can interrupt to stop user class
    -
    -              case e => throw e
    +              case e: Throwable =>
    --- End diff --
    
    that is fine, but note you didn't comment on this one earlier, you commented somewhere else in the code. this one we end up re-throwing so I wasn't as concerned with it.  I can change it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18240426
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -231,33 +258,26 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
         val t = new Thread {
           override def run() {
             var failureCount = 0
    -
    -        while (!finished) {
    +        while (!finished && !Thread.currentThread().isInterrupted()) {
    --- End diff --
    
    Is the second check needed? If the thread is interrupted won't this already have exited?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18174075
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -71,80 +74,134 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private val sparkContextRef = new AtomicReference[SparkContext](null)
     
       final def run(): Int = {
    -    val appAttemptId = client.getAttemptId()
    +    try {
    +      val appAttemptId = client.getAttemptId()
     
    -    if (isDriver) {
    -      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    -      // other spark processes running on the same box
    -      System.setProperty("spark.ui.port", "0")
    +      if (isDriver) {
    +        // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +        // other spark processes running on the same box
    +        System.setProperty("spark.ui.port", "0")
     
    -      // Set the master property to match the requested mode.
    -      System.setProperty("spark.master", "yarn-cluster")
    +        // Set the master property to match the requested mode.
    +        System.setProperty("spark.master", "yarn-cluster")
     
    -      // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    -      System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    -    }
    +        // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    +        System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    +      }
     
    -    logInfo("ApplicationAttemptId: " + appAttemptId)
    +      logInfo("ApplicationAttemptId: " + appAttemptId)
     
    -    val cleanupHook = new Runnable {
    -      override def run() {
    -        // If the SparkContext is still registered, shut it down as a best case effort in case
    -        // users do not call sc.stop or do System.exit().
    -        val sc = sparkContextRef.get()
    -        if (sc != null) {
    -          logInfo("Invoking sc stop from shutdown hook")
    -          sc.stop()
    -          finish(FinalApplicationStatus.SUCCEEDED)
    -        }
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          // If the SparkContext is still registered, shut it down as a best case effort in case
    +          // users do not call sc.stop or do System.exit().
    +          val sc = sparkContextRef.get()
    +          if (sc != null) {
    +            logInfo("Invoking sc stop from shutdown hook")
    +            sc.stop()
    +          }
    +          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +          val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +
    +          if (!finished) {
    +            // this shouldn't ever happen, but if it does assume weird failure
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +              "shutdown hook called without cleanly finishing")
    +          }
     
    -        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    -        // running the AM.
    -        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    -        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    -        if (finished || isLastAttempt) {
    -          cleanupStagingDir()
    +          if (!unregistered) {
    +            // we only want to unregister if we don't want the RM to retry
    +            if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
    +              unregister(finalStatus, finalMsg)
    +              cleanupStagingDir()
    +            }
    +          }
             }
           }
    -    }
     
    -    // Use higher priority than FileSystem.
    -    assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    -    ShutdownHookManager
    -      .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
    +      // Use higher priority than FileSystem.
    +      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    +      ShutdownHookManager
    +        .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
     
    -    // Call this to force generation of secret so it gets populated into the
    -    // Hadoop UGI. This has to happen before the startUserClass which does a
    -    // doAs in order for the credentials to be passed on to the executor containers.
    -    val securityMgr = new SecurityManager(sparkConf)
    +      // Call this to force generation of secret so it gets populated into the
    +      // Hadoop UGI. This has to happen before the startUserClass which does a
    +      // doAs in order for the credentials to be passed on to the executor containers.
    +      val securityMgr = new SecurityManager(sparkConf)
     
    -    if (isDriver) {
    -      runDriver(securityMgr)
    -    } else {
    -      runExecutorLauncher(securityMgr)
    +      if (isDriver) {
    +        runDriver(securityMgr)
    +      } else {
    +        runExecutorLauncher(securityMgr)
    +      }
    +    } catch {
    +      case e: Throwable => {
    +        // catch everything else if not specifically handled
    +        logError("Uncaught exception: ", e)
    +        finish(FinalApplicationStatus.FAILED,
    +          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +          "Uncaught exception: " + e.getMessage())
    +      }
         }
    +    exitCode
    +  }
     
    -    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    -      finish(finalStatus)
    -      0
    -    } else {
    -      1
    +  /**
    +   * unregister is used to completely unregister the application from the ResourceManager.
    +   * This means the ResourceManager will not retry the application attempt on your behalf if
    +   * a failure occurred.
    +   */
    +  final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!unregistered) {
    +      logInfo(s"Unregistering ApplicationMaster with $status" +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      unregistered = true
    +      client.unregister(status, Option(diagnostics).getOrElse(""))
         }
       }
     
    -  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +  final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized {
         if (!finished) {
    -      logInfo(s"Finishing ApplicationMaster with $status"  +
    -        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    -      finished = true
    +      logInfo(s"Final app status: ${status}, exitCode: ${code}" +
    +        Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
    +      exitCode = code
           finalStatus = status
    -      try {
    -        if (Thread.currentThread() != reporterThread) {
    +      finalMsg = msg
    +      finished = true
    +      if (Thread.currentThread() != reporterThread && Option(reporterThread).isDefined) {
    --- End diff --
    
    nit: `Option(...).isDefined` looks weird, maybe just a regular null check? (It's also cheaper, not that it matters though.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18243513
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -71,80 +74,108 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private val sparkContextRef = new AtomicReference[SparkContext](null)
     
       final def run(): Int = {
    -    val appAttemptId = client.getAttemptId()
    +    try {
    +      val appAttemptId = client.getAttemptId()
     
    -    if (isDriver) {
    -      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    -      // other spark processes running on the same box
    -      System.setProperty("spark.ui.port", "0")
    +      if (isDriver) {
    +        // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +        // other spark processes running on the same box
    +        System.setProperty("spark.ui.port", "0")
     
    -      // Set the master property to match the requested mode.
    -      System.setProperty("spark.master", "yarn-cluster")
    +        // Set the master property to match the requested mode.
    +        System.setProperty("spark.master", "yarn-cluster")
     
    -      // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    -      System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    -    }
    +        // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    +        System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    +      }
     
    -    logInfo("ApplicationAttemptId: " + appAttemptId)
    +      logInfo("ApplicationAttemptId: " + appAttemptId)
     
    -    val cleanupHook = new Runnable {
    -      override def run() {
    -        // If the SparkContext is still registered, shut it down as a best case effort in case
    -        // users do not call sc.stop or do System.exit().
    -        val sc = sparkContextRef.get()
    -        if (sc != null) {
    -          logInfo("Invoking sc stop from shutdown hook")
    -          sc.stop()
    -          finish(FinalApplicationStatus.SUCCEEDED)
    -        }
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          // If the SparkContext is still registered, shut it down as a best case effort in case
    +          // users do not call sc.stop or do System.exit().
    +          val sc = sparkContextRef.get()
    +          if (sc != null) {
    +            logInfo("Invoking sc stop from shutdown hook")
    +            sc.stop()
    +          }
    +          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +          val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +
    +          if (!finished) {
    +            // this shouldn't ever happen, but if it does assume weird failure
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +              "shutdown hook called without cleanly finishing")
    +          }
     
    -        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    -        // running the AM.
    -        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    -        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    -        if (finished || isLastAttempt) {
    -          cleanupStagingDir()
    +          if (!unregistered) {
    +            // we only want to unregister if we don't want the RM to retry
    +            if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
    +              unregister(finalStatus, finalMsg)
    +              cleanupStagingDir()
    +            }
    +          }
             }
           }
    -    }
     
    -    // Use higher priority than FileSystem.
    -    assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    -    ShutdownHookManager
    -      .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
    +      // Use higher priority than FileSystem.
    +      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    +      ShutdownHookManager
    +        .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
     
    -    // Call this to force generation of secret so it gets populated into the
    -    // Hadoop UGI. This has to happen before the startUserClass which does a
    -    // doAs in order for the credentials to be passed on to the executor containers.
    -    val securityMgr = new SecurityManager(sparkConf)
    +      // Call this to force generation of secret so it gets populated into the
    +      // Hadoop UGI. This has to happen before the startUserClass which does a
    +      // doAs in order for the credentials to be passed on to the executor containers.
    +      val securityMgr = new SecurityManager(sparkConf)
     
    -    if (isDriver) {
    -      runDriver(securityMgr)
    -    } else {
    -      runExecutorLauncher(securityMgr)
    +      if (isDriver) {
    +        runDriver(securityMgr)
    +      } else {
    +        runExecutorLauncher(securityMgr)
    +      }
    +    } catch {
    +      case e: Throwable => {
    +        // catch everything else if not specifically handled
    +        logError("Uncaught exception: ", e)
    +        finish(FinalApplicationStatus.FAILED,
    +          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +          "Uncaught exception: " + e.getMessage())
    +      }
    --- End diff --
    
    The reason I swallowed it was to return the exit code I explicitly set.  It doesn't matter to much as re-throwing with exit with 1 but its a little nicer to have the exit code.  We probably shouldn't be swallowing errors or perhaps scala !NonFatal though. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18174477
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -71,80 +74,134 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private val sparkContextRef = new AtomicReference[SparkContext](null)
     
       final def run(): Int = {
    -    val appAttemptId = client.getAttemptId()
    +    try {
    +      val appAttemptId = client.getAttemptId()
     
    -    if (isDriver) {
    -      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    -      // other spark processes running on the same box
    -      System.setProperty("spark.ui.port", "0")
    +      if (isDriver) {
    +        // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +        // other spark processes running on the same box
    +        System.setProperty("spark.ui.port", "0")
     
    -      // Set the master property to match the requested mode.
    -      System.setProperty("spark.master", "yarn-cluster")
    +        // Set the master property to match the requested mode.
    +        System.setProperty("spark.master", "yarn-cluster")
     
    -      // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    -      System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    -    }
    +        // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    +        System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    +      }
     
    -    logInfo("ApplicationAttemptId: " + appAttemptId)
    +      logInfo("ApplicationAttemptId: " + appAttemptId)
     
    -    val cleanupHook = new Runnable {
    -      override def run() {
    -        // If the SparkContext is still registered, shut it down as a best case effort in case
    -        // users do not call sc.stop or do System.exit().
    -        val sc = sparkContextRef.get()
    -        if (sc != null) {
    -          logInfo("Invoking sc stop from shutdown hook")
    -          sc.stop()
    -          finish(FinalApplicationStatus.SUCCEEDED)
    -        }
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          // If the SparkContext is still registered, shut it down as a best case effort in case
    +          // users do not call sc.stop or do System.exit().
    +          val sc = sparkContextRef.get()
    +          if (sc != null) {
    +            logInfo("Invoking sc stop from shutdown hook")
    +            sc.stop()
    +          }
    +          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +          val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +
    +          if (!finished) {
    +            // this shouldn't ever happen, but if it does assume weird failure
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +              "shutdown hook called without cleanly finishing")
    +          }
     
    -        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    -        // running the AM.
    -        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    -        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    -        if (finished || isLastAttempt) {
    -          cleanupStagingDir()
    +          if (!unregistered) {
    +            // we only want to unregister if we don't want the RM to retry
    +            if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
    +              unregister(finalStatus, finalMsg)
    +              cleanupStagingDir()
    +            }
    +          }
             }
           }
    -    }
     
    -    // Use higher priority than FileSystem.
    -    assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    -    ShutdownHookManager
    -      .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
    +      // Use higher priority than FileSystem.
    +      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    +      ShutdownHookManager
    +        .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
     
    -    // Call this to force generation of secret so it gets populated into the
    -    // Hadoop UGI. This has to happen before the startUserClass which does a
    -    // doAs in order for the credentials to be passed on to the executor containers.
    -    val securityMgr = new SecurityManager(sparkConf)
    +      // Call this to force generation of secret so it gets populated into the
    +      // Hadoop UGI. This has to happen before the startUserClass which does a
    +      // doAs in order for the credentials to be passed on to the executor containers.
    +      val securityMgr = new SecurityManager(sparkConf)
     
    -    if (isDriver) {
    -      runDriver(securityMgr)
    -    } else {
    -      runExecutorLauncher(securityMgr)
    +      if (isDriver) {
    +        runDriver(securityMgr)
    +      } else {
    +        runExecutorLauncher(securityMgr)
    +      }
    +    } catch {
    +      case e: Throwable => {
    +        // catch everything else if not specifically handled
    +        logError("Uncaught exception: ", e)
    +        finish(FinalApplicationStatus.FAILED,
    +          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +          "Uncaught exception: " + e.getMessage())
    +      }
         }
    +    exitCode
    +  }
     
    -    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    -      finish(finalStatus)
    -      0
    -    } else {
    -      1
    +  /**
    +   * unregister is used to completely unregister the application from the ResourceManager.
    +   * This means the ResourceManager will not retry the application attempt on your behalf if
    +   * a failure occurred.
    +   */
    +  final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!unregistered) {
    +      logInfo(s"Unregistering ApplicationMaster with $status" +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      unregistered = true
    +      client.unregister(status, Option(diagnostics).getOrElse(""))
         }
       }
     
    -  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +  final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized {
         if (!finished) {
    -      logInfo(s"Finishing ApplicationMaster with $status"  +
    -        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    -      finished = true
    +      logInfo(s"Final app status: ${status}, exitCode: ${code}" +
    +        Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
    +      exitCode = code
           finalStatus = status
    -      try {
    -        if (Thread.currentThread() != reporterThread) {
    +      finalMsg = msg
    +      finished = true
    +      if (Thread.currentThread() != reporterThread && Option(reporterThread).isDefined) {
    +        logDebug("shutting down reporter thread")
    +        try {
               reporterThread.interrupt()
    -          reporterThread.join()
    +        } catch {
    +          case e => {
    +            logError("Exception trying to shutdown report thread", e)
    +            // since main thread (client mode) could be waiting on this to finish
    +            // just exit here
    +            System.exit(ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION)
    +          }
    +        }
    +      }
    +      if (Thread.currentThread() != userClassThread && Option(userClassThread).isDefined) {
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from finish")
    +          sc.stop()
    +        }
    +        logDebug("shutting down user thread")
    +        try {
    +          // since we don't know what the user thread is doing at the time
    +          // of interrupt catch exception so we still exit. For instance
    +          // we could get a java.nio.channels.ClosedByInterruptException.
    +          userClassThread.interrupt()
    +        } catch {
    +          case e => {
    --- End diff --
    
    nit: the style I generally see does not have '{}' in these blocks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18459205
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -383,40 +405,82 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
         }
       }
     
    +  /**
    +   * This system security manager applies to the entire process.
    +   * It's main purpose is to handle the case if the user code does a System.exit.
    +   * This allows us to catch that and properly set the YARN application status and
    +   * cleanup if needed.
    +   */
    +  private def setupSystemSecurityManager() = {
    +    try {
    +      var stopped = false
    +      System.setSecurityManager(new java.lang.SecurityManager() {
    +        override def checkExit(paramInt: Int) {
    +          if (!stopped) {
    +            logInfo("In securityManager checkExit, exit code: " + paramInt)
    +            if (paramInt == 0) {
    +              finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
    +            } else {
    +              finish(FinalApplicationStatus.FAILED,
    +                paramInt,
    +                "User class exited with non-zero exit code")
    +            }
    +            stopped = true
    +          }
    +        }
    +        // required for the checkExit to work properly
    +        override def checkPermission(perm: java.security.Permission): Unit = {
    +        }
    --- End diff --
    
    In the future please clarify what you want bumped up as you said this prior and I thought you meant remove the extra space between 430 and 431.  I assume you actually mean the }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57818940
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21250/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-58037924
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21330/consoleFull) for   PR 2577 at commit [`9c2efbf`](https://github.com/apache/spark/commit/9c2efbfd8d199bf89f911e44c7b07c6afe6b15bd).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class CacheTableCommand(tableName: String, plan: Option[LogicalPlan], isLazy: Boolean)`
      * `case class UncacheTableCommand(tableName: String) extends Command`
      * `case class CacheTableCommand(`
      * `case class UncacheTableCommand(tableName: String) extends LeafNode with Command `
      * `case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18174328
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -71,80 +74,134 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private val sparkContextRef = new AtomicReference[SparkContext](null)
     
       final def run(): Int = {
    -    val appAttemptId = client.getAttemptId()
    +    try {
    +      val appAttemptId = client.getAttemptId()
     
    -    if (isDriver) {
    -      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    -      // other spark processes running on the same box
    -      System.setProperty("spark.ui.port", "0")
    +      if (isDriver) {
    +        // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +        // other spark processes running on the same box
    +        System.setProperty("spark.ui.port", "0")
     
    -      // Set the master property to match the requested mode.
    -      System.setProperty("spark.master", "yarn-cluster")
    +        // Set the master property to match the requested mode.
    +        System.setProperty("spark.master", "yarn-cluster")
     
    -      // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    -      System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    -    }
    +        // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    +        System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    +      }
     
    -    logInfo("ApplicationAttemptId: " + appAttemptId)
    +      logInfo("ApplicationAttemptId: " + appAttemptId)
     
    -    val cleanupHook = new Runnable {
    -      override def run() {
    -        // If the SparkContext is still registered, shut it down as a best case effort in case
    -        // users do not call sc.stop or do System.exit().
    -        val sc = sparkContextRef.get()
    -        if (sc != null) {
    -          logInfo("Invoking sc stop from shutdown hook")
    -          sc.stop()
    -          finish(FinalApplicationStatus.SUCCEEDED)
    -        }
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          // If the SparkContext is still registered, shut it down as a best case effort in case
    +          // users do not call sc.stop or do System.exit().
    +          val sc = sparkContextRef.get()
    +          if (sc != null) {
    +            logInfo("Invoking sc stop from shutdown hook")
    +            sc.stop()
    +          }
    +          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +          val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +
    +          if (!finished) {
    +            // this shouldn't ever happen, but if it does assume weird failure
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +              "shutdown hook called without cleanly finishing")
    +          }
     
    -        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    -        // running the AM.
    -        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    -        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    -        if (finished || isLastAttempt) {
    -          cleanupStagingDir()
    +          if (!unregistered) {
    +            // we only want to unregister if we don't want the RM to retry
    +            if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
    +              unregister(finalStatus, finalMsg)
    +              cleanupStagingDir()
    +            }
    +          }
             }
           }
    -    }
     
    -    // Use higher priority than FileSystem.
    -    assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    -    ShutdownHookManager
    -      .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
    +      // Use higher priority than FileSystem.
    +      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    +      ShutdownHookManager
    +        .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
     
    -    // Call this to force generation of secret so it gets populated into the
    -    // Hadoop UGI. This has to happen before the startUserClass which does a
    -    // doAs in order for the credentials to be passed on to the executor containers.
    -    val securityMgr = new SecurityManager(sparkConf)
    +      // Call this to force generation of secret so it gets populated into the
    +      // Hadoop UGI. This has to happen before the startUserClass which does a
    +      // doAs in order for the credentials to be passed on to the executor containers.
    +      val securityMgr = new SecurityManager(sparkConf)
     
    -    if (isDriver) {
    -      runDriver(securityMgr)
    -    } else {
    -      runExecutorLauncher(securityMgr)
    +      if (isDriver) {
    +        runDriver(securityMgr)
    +      } else {
    +        runExecutorLauncher(securityMgr)
    +      }
    +    } catch {
    +      case e: Throwable => {
    +        // catch everything else if not specifically handled
    +        logError("Uncaught exception: ", e)
    +        finish(FinalApplicationStatus.FAILED,
    +          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +          "Uncaught exception: " + e.getMessage())
    +      }
         }
    +    exitCode
    +  }
     
    -    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    -      finish(finalStatus)
    -      0
    -    } else {
    -      1
    +  /**
    +   * unregister is used to completely unregister the application from the ResourceManager.
    +   * This means the ResourceManager will not retry the application attempt on your behalf if
    +   * a failure occurred.
    +   */
    +  final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!unregistered) {
    +      logInfo(s"Unregistering ApplicationMaster with $status" +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      unregistered = true
    +      client.unregister(status, Option(diagnostics).getOrElse(""))
         }
       }
     
    -  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +  final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized {
         if (!finished) {
    -      logInfo(s"Finishing ApplicationMaster with $status"  +
    -        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    -      finished = true
    +      logInfo(s"Final app status: ${status}, exitCode: ${code}" +
    +        Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
    +      exitCode = code
           finalStatus = status
    -      try {
    -        if (Thread.currentThread() != reporterThread) {
    +      finalMsg = msg
    +      finished = true
    +      if (Thread.currentThread() != reporterThread && Option(reporterThread).isDefined) {
    +        logDebug("shutting down reporter thread")
    +        try {
               reporterThread.interrupt()
    -          reporterThread.join()
    +        } catch {
    +          case e => {
    +            logError("Exception trying to shutdown report thread", e)
    +            // since main thread (client mode) could be waiting on this to finish
    +            // just exit here
    +            System.exit(ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION)
    +          }
    +        }
    +      }
    +      if (Thread.currentThread() != userClassThread && Option(userClassThread).isDefined) {
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from finish")
    +          sc.stop()
    +        }
    +        logDebug("shutting down user thread")
    +        try {
    +          // since we don't know what the user thread is doing at the time
    +          // of interrupt catch exception so we still exit. For instance
    +          // we could get a java.nio.channels.ClosedByInterruptException.
    +          userClassThread.interrupt()
    +        } catch {
    +          case e => {
    +            logError("Exception trying to shutdown user thread", e)
    --- End diff --
    
    Same question about `Thread.interrupt()`.
    
    Also, might be useful to check if the thread is alive before trying to interrupt it; although it's probably ok the way it is in your current code. (I was mostly thinking about log messages but it seems all of them are `logDebug`.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18239755
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -383,40 +404,80 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
         }
       }
     
    +  /**
    +   * Start the user class, which contains the spark driver.
    +   * If the main routine exits cleanly or exits with System.exit(0) we
    +   * assume it was successful, for all other cases we assume failure.
    +   */
       private def startUserClass(): Thread = {
         logInfo("Starting the user JAR in a separate Thread")
         System.setProperty("spark.executor.instances", args.numExecutors.toString)
    +    var stopped = false
         val mainMethod = Class.forName(args.userClass, false,
           Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
     
    -    userClassThread = new Thread {
    +    val userThread = new Thread {
           override def run() {
    -        var status = FinalApplicationStatus.FAILED
    +
    +        try {
    +          // Note this security manager applies to the entire process, not
    +          // just this thread. It's here to handle the case if the user code
    +          // does System.exit
    +          System.setSecurityManager(new java.lang.SecurityManager() {
    +            override def checkExit(paramInt: Int) {
    +              if (!stopped) {
    +                logInfo("In securityManager checkExit, exit code: " + paramInt)
    +                if (paramInt == 0) {
    +                  finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
    +                } else {
    +                  finish(FinalApplicationStatus.FAILED,
    +                    paramInt,
    +                    "User class exited with non-zero exit code")
    +                }
    +                stopped = true
    +              }
    +            }
    +
    +            // required for the checkExit to work properly
    +            override def checkPermission(perm: java.security.Permission): Unit = {
    +            }
    +          })
    +        }
    +        catch {
    +          case e: SecurityException => {
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_SECURITY,
    +              "Error in setSecurityManager")
    +            logError("Error in setSecurityManager:", e)
    +          }
    +        }
    +
    --- End diff --
    
    No it could be pulled out, but its also not needed for yarn-client mode


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18239726
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -71,80 +74,108 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private val sparkContextRef = new AtomicReference[SparkContext](null)
     
       final def run(): Int = {
    -    val appAttemptId = client.getAttemptId()
    +    try {
    +      val appAttemptId = client.getAttemptId()
     
    -    if (isDriver) {
    -      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    -      // other spark processes running on the same box
    -      System.setProperty("spark.ui.port", "0")
    +      if (isDriver) {
    +        // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +        // other spark processes running on the same box
    +        System.setProperty("spark.ui.port", "0")
     
    -      // Set the master property to match the requested mode.
    -      System.setProperty("spark.master", "yarn-cluster")
    +        // Set the master property to match the requested mode.
    +        System.setProperty("spark.master", "yarn-cluster")
     
    -      // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    -      System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    -    }
    +        // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    +        System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    +      }
     
    -    logInfo("ApplicationAttemptId: " + appAttemptId)
    +      logInfo("ApplicationAttemptId: " + appAttemptId)
     
    -    val cleanupHook = new Runnable {
    -      override def run() {
    -        // If the SparkContext is still registered, shut it down as a best case effort in case
    -        // users do not call sc.stop or do System.exit().
    -        val sc = sparkContextRef.get()
    -        if (sc != null) {
    -          logInfo("Invoking sc stop from shutdown hook")
    -          sc.stop()
    -          finish(FinalApplicationStatus.SUCCEEDED)
    -        }
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          // If the SparkContext is still registered, shut it down as a best case effort in case
    +          // users do not call sc.stop or do System.exit().
    +          val sc = sparkContextRef.get()
    +          if (sc != null) {
    +            logInfo("Invoking sc stop from shutdown hook")
    +            sc.stop()
    +          }
    +          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +          val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +
    +          if (!finished) {
    +            // this shouldn't ever happen, but if it does assume weird failure
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +              "shutdown hook called without cleanly finishing")
    +          }
     
    -        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    -        // running the AM.
    -        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    -        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    -        if (finished || isLastAttempt) {
    -          cleanupStagingDir()
    +          if (!unregistered) {
    +            // we only want to unregister if we don't want the RM to retry
    +            if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
    +              unregister(finalStatus, finalMsg)
    +              cleanupStagingDir()
    +            }
    +          }
             }
           }
    -    }
     
    -    // Use higher priority than FileSystem.
    -    assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    -    ShutdownHookManager
    -      .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
    +      // Use higher priority than FileSystem.
    +      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    +      ShutdownHookManager
    +        .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
     
    -    // Call this to force generation of secret so it gets populated into the
    -    // Hadoop UGI. This has to happen before the startUserClass which does a
    -    // doAs in order for the credentials to be passed on to the executor containers.
    -    val securityMgr = new SecurityManager(sparkConf)
    +      // Call this to force generation of secret so it gets populated into the
    +      // Hadoop UGI. This has to happen before the startUserClass which does a
    +      // doAs in order for the credentials to be passed on to the executor containers.
    +      val securityMgr = new SecurityManager(sparkConf)
     
    -    if (isDriver) {
    -      runDriver(securityMgr)
    -    } else {
    -      runExecutorLauncher(securityMgr)
    +      if (isDriver) {
    +        runDriver(securityMgr)
    +      } else {
    +        runExecutorLauncher(securityMgr)
    +      }
    +    } catch {
    +      case e: Throwable => {
    +        // catch everything else if not specifically handled
    +        logError("Uncaught exception: ", e)
    +        finish(FinalApplicationStatus.FAILED,
    +          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +          "Uncaught exception: " + e.getMessage())
    +      }
    --- End diff --
    
    Do we need to rethrow the exception here? In general we should not swallow `Throwable`s


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/2577


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18251599
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -383,40 +404,80 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
         }
       }
     
    +  /**
    +   * Start the user class, which contains the spark driver.
    +   * If the main routine exits cleanly or exits with System.exit(0) we
    +   * assume it was successful, for all other cases we assume failure.
    +   */
       private def startUserClass(): Thread = {
         logInfo("Starting the user JAR in a separate Thread")
         System.setProperty("spark.executor.instances", args.numExecutors.toString)
    +    var stopped = false
         val mainMethod = Class.forName(args.userClass, false,
           Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
     
    -    userClassThread = new Thread {
    +    val userThread = new Thread {
           override def run() {
    -        var status = FinalApplicationStatus.FAILED
    +
    +        try {
    +          // Note this security manager applies to the entire process, not
    +          // just this thread. It's here to handle the case if the user code
    +          // does System.exit
    +          System.setSecurityManager(new java.lang.SecurityManager() {
    +            override def checkExit(paramInt: Int) {
    +              if (!stopped) {
    +                logInfo("In securityManager checkExit, exit code: " + paramInt)
    +                if (paramInt == 0) {
    +                  finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
    +                } else {
    +                  finish(FinalApplicationStatus.FAILED,
    +                    paramInt,
    +                    "User class exited with non-zero exit code")
    +                }
    +                stopped = true
    +              }
    +            }
    +
    +            // required for the checkExit to work properly
    +            override def checkPermission(perm: java.security.Permission): Unit = {
    +            }
    +          })
    +        }
    +        catch {
    +          case e: SecurityException => {
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_SECURITY,
    +              "Error in setSecurityManager")
    +            logError("Error in setSecurityManager:", e)
    +          }
    +        }
    +
    --- End diff --
    
    sounds good, I'll separate it out. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57176392
  
    @witgo can you verify this covers https://github.com/apache/spark/pull/2311


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57176963
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20975/consoleFull) for   PR 2577 at commit [`32f4dfa`](https://github.com/apache/spark/commit/32f4dfa5d0778e3e7bd7c5da27c71f8c3f5930f6).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18170801
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -450,6 +539,15 @@ object ApplicationMaster extends Logging {
     
       val SHUTDOWN_HOOK_PRIORITY: Int = 30
     
    +  // exit codes for different causes, no reason behind the values
    --- End diff --
    
    We can use this class?
    [ExecutorExitCode](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18395494
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -328,10 +348,18 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private def waitForSparkDriver(): ActorRef = {
         logInfo("Waiting for Spark driver to be reachable.")
         var driverUp = false
    +    var count = 0
         val hostport = args.userArgs(0)
         val (driverHost, driverPort) = Utils.parseHostPort(hostport)
    -    while (!driverUp) {
    +
    +    // spark driver should already be up since it launched us, but we don't want to
    +    // wait forever, so wait 100 seconds max to match the cluster mode setting.
    +    // Leave this config unpublished for now.
    +    val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.client.waitTries", 1000)
    --- End diff --
    
    https://issues.apache.org/jira/browse/SPARK-3779 filed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18418213
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -328,10 +349,18 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private def waitForSparkDriver(): ActorRef = {
         logInfo("Waiting for Spark driver to be reachable.")
         var driverUp = false
    +    var count = 0
         val hostport = args.userArgs(0)
         val (driverHost, driverPort) = Utils.parseHostPort(hostport)
    -    while (!driverUp) {
    +
    +    // spark driver should already be up since it launched us, but we don't want to
    +    // wait forever, so wait 100 seconds max to match the cluster mode setting.
    +    // Leave this config unpublished for now.
    --- End diff --
    
    minor, but can you add `SPARK-3779` to the comment so others know we're tracking this issue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-58136343
  
    LGTM, feel free to merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18395371
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -328,10 +348,18 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private def waitForSparkDriver(): ActorRef = {
         logInfo("Waiting for Spark driver to be reachable.")
         var driverUp = false
    +    var count = 0
         val hostport = args.userArgs(0)
         val (driverHost, driverPort) = Utils.parseHostPort(hostport)
    -    while (!driverUp) {
    +
    +    // spark driver should already be up since it launched us, but we don't want to
    +    // wait forever, so wait 100 seconds max to match the cluster mode setting.
    +    // Leave this config unpublished for now.
    +    val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.client.waitTries", 1000)
    --- End diff --
    
    ok for this pr I'll leave it applicationMaster.waitTries and match cluster mode and I'll file a separate jira to clean it up. The documentation doesn't state how long each loop is for example.  I think these would be better to just change to be a wait times versus number of tries and then they can be used for both modes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57209937
  
    Looks ok to me, although the exception handling does feel a little paranoid. :-) Just had a few nits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18243719
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -328,10 +348,18 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private def waitForSparkDriver(): ActorRef = {
         logInfo("Waiting for Spark driver to be reachable.")
         var driverUp = false
    +    var count = 0
         val hostport = args.userArgs(0)
         val (driverHost, driverPort) = Utils.parseHostPort(hostport)
    -    while (!driverUp) {
    +
    +    // spark driver should already be up since it launched us, but we don't want to
    +    // wait forever, so wait 100 seconds max to match the cluster mode setting.
    +    // Leave this config unpublished for now.
    +    val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.client.waitTries", 1000)
    --- End diff --
    
    yes the client was tacked on to mean it used in the client mode because the timing of the loops are different between the modes.  Its an internal config right now so user shouldn't be setting. The timing is different because client mode is already up when this is launched, versus in cluster mode we are launching the user code, which takes some times (10's of seconds).
    
    I'll file a separate jira to fix up the mismatch in doc/config.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-58197006
  
    Thanks @andrewor14.  I've merged this into 1.2


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18418362
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -383,40 +405,82 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
         }
       }
     
    +  /**
    +   * This system security manager applies to the entire process.
    +   * It's main purpose is to handle the case if the user code does a System.exit.
    +   * This allows us to catch that and properly set the YARN application status and
    +   * cleanup if needed.
    +   */
    +  private def setupSystemSecurityManager() = {
    +    try {
    +      var stopped = false
    +      System.setSecurityManager(new java.lang.SecurityManager() {
    +        override def checkExit(paramInt: Int) {
    +          if (!stopped) {
    +            logInfo("In securityManager checkExit, exit code: " + paramInt)
    +            if (paramInt == 0) {
    +              finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
    +            } else {
    +              finish(FinalApplicationStatus.FAILED,
    +                paramInt,
    +                "User class exited with non-zero exit code")
    +            }
    +            stopped = true
    +          }
    +        }
    +        // required for the checkExit to work properly
    +        override def checkPermission(perm: java.security.Permission): Unit = {
    +        }
    +      })
    +    }
    +    catch {
    +      case e: SecurityException =>
    +        finish(FinalApplicationStatus.FAILED,
    +          ApplicationMaster.EXIT_SECURITY,
    +          "Error in setSecurityManager")
    +        logError("Error in setSecurityManager:", e)
    +    }
    +  }
    +
    +  /**
    +   * Start the user class, which contains the spark driver, in a separate Thread.
    +   * If the main routine exits cleanly or exits with System.exit(0) we
    +   * assume it was successful, for all other cases we assume failure.
    +   *
    +   * Returns the user thread that was started.
    +   */
       private def startUserClass(): Thread = {
         logInfo("Starting the user JAR in a separate Thread")
         System.setProperty("spark.executor.instances", args.numExecutors.toString)
         val mainMethod = Class.forName(args.userClass, false,
           Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
     
    -    userClassThread = new Thread {
    +    val userThread = new Thread {
           override def run() {
    -        var status = FinalApplicationStatus.FAILED
             try {
    -          // Copy
               val mainArgs = new Array[String](args.userArgs.size)
               args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
               mainMethod.invoke(null, mainArgs)
    -          // Some apps have "System.exit(0)" at the end.  The user thread will stop here unless
    -          // it has an uncaught exception thrown out.  It needs a shutdown hook to set SUCCEEDED.
    -          status = FinalApplicationStatus.SUCCEEDED
    +          finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
    +          logDebug("Done running users class")
             } catch {
               case e: InvocationTargetException =>
                 e.getCause match {
                   case _: InterruptedException =>
                     // Reporter thread can interrupt to stop user class
    -
    -              case e => throw e
    +              case e: Throwable =>
    --- End diff --
    
    I still think we should catch only Exception here. All we ever do in `finish` is to kill the threads and log the exit code, and if we get a really bad `Throwable` that kills the JVM then these threads won't survive anyway. It's just that the JVM is not guaranteed to do whatever `finish` does properly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18180991
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -71,80 +74,134 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private val sparkContextRef = new AtomicReference[SparkContext](null)
     
       final def run(): Int = {
    -    val appAttemptId = client.getAttemptId()
    +    try {
    +      val appAttemptId = client.getAttemptId()
     
    -    if (isDriver) {
    -      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    -      // other spark processes running on the same box
    -      System.setProperty("spark.ui.port", "0")
    +      if (isDriver) {
    +        // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +        // other spark processes running on the same box
    +        System.setProperty("spark.ui.port", "0")
     
    -      // Set the master property to match the requested mode.
    -      System.setProperty("spark.master", "yarn-cluster")
    +        // Set the master property to match the requested mode.
    +        System.setProperty("spark.master", "yarn-cluster")
     
    -      // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    -      System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    -    }
    +        // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    +        System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    +      }
     
    -    logInfo("ApplicationAttemptId: " + appAttemptId)
    +      logInfo("ApplicationAttemptId: " + appAttemptId)
     
    -    val cleanupHook = new Runnable {
    -      override def run() {
    -        // If the SparkContext is still registered, shut it down as a best case effort in case
    -        // users do not call sc.stop or do System.exit().
    -        val sc = sparkContextRef.get()
    -        if (sc != null) {
    -          logInfo("Invoking sc stop from shutdown hook")
    -          sc.stop()
    -          finish(FinalApplicationStatus.SUCCEEDED)
    -        }
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          // If the SparkContext is still registered, shut it down as a best case effort in case
    +          // users do not call sc.stop or do System.exit().
    +          val sc = sparkContextRef.get()
    +          if (sc != null) {
    +            logInfo("Invoking sc stop from shutdown hook")
    +            sc.stop()
    +          }
    +          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +          val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +
    +          if (!finished) {
    +            // this shouldn't ever happen, but if it does assume weird failure
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +              "shutdown hook called without cleanly finishing")
    +          }
     
    -        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    -        // running the AM.
    -        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    -        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    -        if (finished || isLastAttempt) {
    -          cleanupStagingDir()
    +          if (!unregistered) {
    +            // we only want to unregister if we don't want the RM to retry
    +            if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
    +              unregister(finalStatus, finalMsg)
    +              cleanupStagingDir()
    +            }
    +          }
             }
           }
    -    }
     
    -    // Use higher priority than FileSystem.
    -    assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    -    ShutdownHookManager
    -      .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
    +      // Use higher priority than FileSystem.
    +      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    +      ShutdownHookManager
    +        .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
     
    -    // Call this to force generation of secret so it gets populated into the
    -    // Hadoop UGI. This has to happen before the startUserClass which does a
    -    // doAs in order for the credentials to be passed on to the executor containers.
    -    val securityMgr = new SecurityManager(sparkConf)
    +      // Call this to force generation of secret so it gets populated into the
    +      // Hadoop UGI. This has to happen before the startUserClass which does a
    +      // doAs in order for the credentials to be passed on to the executor containers.
    +      val securityMgr = new SecurityManager(sparkConf)
     
    -    if (isDriver) {
    -      runDriver(securityMgr)
    -    } else {
    -      runExecutorLauncher(securityMgr)
    +      if (isDriver) {
    +        runDriver(securityMgr)
    +      } else {
    +        runExecutorLauncher(securityMgr)
    +      }
    +    } catch {
    +      case e: Throwable => {
    +        // catch everything else if not specifically handled
    +        logError("Uncaught exception: ", e)
    +        finish(FinalApplicationStatus.FAILED,
    +          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +          "Uncaught exception: " + e.getMessage())
    +      }
         }
    +    exitCode
    +  }
     
    -    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    -      finish(finalStatus)
    -      0
    -    } else {
    -      1
    +  /**
    +   * unregister is used to completely unregister the application from the ResourceManager.
    +   * This means the ResourceManager will not retry the application attempt on your behalf if
    +   * a failure occurred.
    +   */
    +  final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!unregistered) {
    +      logInfo(s"Unregistering ApplicationMaster with $status" +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      unregistered = true
    +      client.unregister(status, Option(diagnostics).getOrElse(""))
         }
       }
     
    -  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +  final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized {
         if (!finished) {
    -      logInfo(s"Finishing ApplicationMaster with $status"  +
    -        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    -      finished = true
    +      logInfo(s"Final app status: ${status}, exitCode: ${code}" +
    +        Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
    +      exitCode = code
           finalStatus = status
    -      try {
    -        if (Thread.currentThread() != reporterThread) {
    +      finalMsg = msg
    +      finished = true
    +      if (Thread.currentThread() != reporterThread && Option(reporterThread).isDefined) {
    +        logDebug("shutting down reporter thread")
    +        try {
               reporterThread.interrupt()
    -          reporterThread.join()
    +        } catch {
    +          case e => {
    +            logError("Exception trying to shutdown report thread", e)
    +            // since main thread (client mode) could be waiting on this to finish
    +            // just exit here
    +            System.exit(ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION)
    +          }
    +        }
    +      }
    +      if (Thread.currentThread() != userClassThread && Option(userClassThread).isDefined) {
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from finish")
    +          sc.stop()
    +        }
    +        logDebug("shutting down user thread")
    +        try {
    +          // since we don't know what the user thread is doing at the time
    +          // of interrupt catch exception so we still exit. For instance
    +          // we could get a java.nio.channels.ClosedByInterruptException.
    +          userClassThread.interrupt()
    +        } catch {
    +          case e => {
    +            logError("Exception trying to shutdown user thread", e)
    --- End diff --
    
    Nah, it's ok the way it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18418613
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -383,40 +405,82 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
         }
       }
     
    +  /**
    +   * This system security manager applies to the entire process.
    +   * It's main purpose is to handle the case if the user code does a System.exit.
    +   * This allows us to catch that and properly set the YARN application status and
    +   * cleanup if needed.
    +   */
    +  private def setupSystemSecurityManager() = {
    --- End diff --
    
    can you add `: Unit`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18239431
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -383,40 +404,80 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
         }
       }
     
    +  /**
    +   * Start the user class, which contains the spark driver.
    +   * If the main routine exits cleanly or exits with System.exit(0) we
    +   * assume it was successful, for all other cases we assume failure.
    +   */
       private def startUserClass(): Thread = {
         logInfo("Starting the user JAR in a separate Thread")
         System.setProperty("spark.executor.instances", args.numExecutors.toString)
    +    var stopped = false
         val mainMethod = Class.forName(args.userClass, false,
           Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
     
    -    userClassThread = new Thread {
    +    val userThread = new Thread {
           override def run() {
    -        var status = FinalApplicationStatus.FAILED
    +
    +        try {
    +          // Note this security manager applies to the entire process, not
    +          // just this thread. It's here to handle the case if the user code
    +          // does System.exit
    +          System.setSecurityManager(new java.lang.SecurityManager() {
    +            override def checkExit(paramInt: Int) {
    +              if (!stopped) {
    +                logInfo("In securityManager checkExit, exit code: " + paramInt)
    +                if (paramInt == 0) {
    +                  finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
    +                } else {
    +                  finish(FinalApplicationStatus.FAILED,
    +                    paramInt,
    +                    "User class exited with non-zero exit code")
    +                }
    +                stopped = true
    +              }
    +            }
    +
    +            // required for the checkExit to work properly
    +            override def checkPermission(perm: java.security.Permission): Unit = {
    +            }
    +          })
    +        }
    +        catch {
    +          case e: SecurityException => {
    --- End diff --
    
    nit: no need for `{` here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57807019
  
    Addressed all the review comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-58037937
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21330/Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57808148
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21248/consoleFull) for   PR 2577 at commit [`24c98e3`](https://github.com/apache/spark/commit/24c98e3154ee2ee93dc4c958ac982b534a798972).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57185274
  
    also note this does change everything to allow yarn to retry. previously when it hit the maximum number of executor failures it didn't retry the AM.  I waffled back and forth on this one.  At first the thought was that if that many executors are dying its probably an issue with the user code, but then again if you have a really long running job then I can think of situations you want it to retry.    Anyone have strong opinion on that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18174148
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -71,80 +74,134 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private val sparkContextRef = new AtomicReference[SparkContext](null)
     
       final def run(): Int = {
    -    val appAttemptId = client.getAttemptId()
    +    try {
    +      val appAttemptId = client.getAttemptId()
     
    -    if (isDriver) {
    -      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    -      // other spark processes running on the same box
    -      System.setProperty("spark.ui.port", "0")
    +      if (isDriver) {
    +        // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +        // other spark processes running on the same box
    +        System.setProperty("spark.ui.port", "0")
     
    -      // Set the master property to match the requested mode.
    -      System.setProperty("spark.master", "yarn-cluster")
    +        // Set the master property to match the requested mode.
    +        System.setProperty("spark.master", "yarn-cluster")
     
    -      // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    -      System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    -    }
    +        // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    +        System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    +      }
     
    -    logInfo("ApplicationAttemptId: " + appAttemptId)
    +      logInfo("ApplicationAttemptId: " + appAttemptId)
     
    -    val cleanupHook = new Runnable {
    -      override def run() {
    -        // If the SparkContext is still registered, shut it down as a best case effort in case
    -        // users do not call sc.stop or do System.exit().
    -        val sc = sparkContextRef.get()
    -        if (sc != null) {
    -          logInfo("Invoking sc stop from shutdown hook")
    -          sc.stop()
    -          finish(FinalApplicationStatus.SUCCEEDED)
    -        }
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          // If the SparkContext is still registered, shut it down as a best case effort in case
    +          // users do not call sc.stop or do System.exit().
    +          val sc = sparkContextRef.get()
    +          if (sc != null) {
    +            logInfo("Invoking sc stop from shutdown hook")
    +            sc.stop()
    +          }
    +          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +          val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +
    +          if (!finished) {
    +            // this shouldn't ever happen, but if it does assume weird failure
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +              "shutdown hook called without cleanly finishing")
    +          }
     
    -        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    -        // running the AM.
    -        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    -        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    -        if (finished || isLastAttempt) {
    -          cleanupStagingDir()
    +          if (!unregistered) {
    +            // we only want to unregister if we don't want the RM to retry
    +            if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
    +              unregister(finalStatus, finalMsg)
    +              cleanupStagingDir()
    +            }
    +          }
             }
           }
    -    }
     
    -    // Use higher priority than FileSystem.
    -    assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    -    ShutdownHookManager
    -      .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
    +      // Use higher priority than FileSystem.
    +      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    +      ShutdownHookManager
    +        .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
     
    -    // Call this to force generation of secret so it gets populated into the
    -    // Hadoop UGI. This has to happen before the startUserClass which does a
    -    // doAs in order for the credentials to be passed on to the executor containers.
    -    val securityMgr = new SecurityManager(sparkConf)
    +      // Call this to force generation of secret so it gets populated into the
    +      // Hadoop UGI. This has to happen before the startUserClass which does a
    +      // doAs in order for the credentials to be passed on to the executor containers.
    +      val securityMgr = new SecurityManager(sparkConf)
     
    -    if (isDriver) {
    -      runDriver(securityMgr)
    -    } else {
    -      runExecutorLauncher(securityMgr)
    +      if (isDriver) {
    +        runDriver(securityMgr)
    +      } else {
    +        runExecutorLauncher(securityMgr)
    +      }
    +    } catch {
    +      case e: Throwable => {
    +        // catch everything else if not specifically handled
    +        logError("Uncaught exception: ", e)
    +        finish(FinalApplicationStatus.FAILED,
    +          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +          "Uncaught exception: " + e.getMessage())
    +      }
         }
    +    exitCode
    +  }
     
    -    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    -      finish(finalStatus)
    -      0
    -    } else {
    -      1
    +  /**
    +   * unregister is used to completely unregister the application from the ResourceManager.
    +   * This means the ResourceManager will not retry the application attempt on your behalf if
    +   * a failure occurred.
    +   */
    +  final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!unregistered) {
    +      logInfo(s"Unregistering ApplicationMaster with $status" +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      unregistered = true
    +      client.unregister(status, Option(diagnostics).getOrElse(""))
         }
       }
     
    -  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +  final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized {
         if (!finished) {
    -      logInfo(s"Finishing ApplicationMaster with $status"  +
    -        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    -      finished = true
    +      logInfo(s"Final app status: ${status}, exitCode: ${code}" +
    +        Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
    +      exitCode = code
           finalStatus = status
    -      try {
    -        if (Thread.currentThread() != reporterThread) {
    +      finalMsg = msg
    +      finished = true
    +      if (Thread.currentThread() != reporterThread && Option(reporterThread).isDefined) {
    +        logDebug("shutting down reporter thread")
    +        try {
               reporterThread.interrupt()
    -          reporterThread.join()
    +        } catch {
    +          case e => {
    +            logError("Exception trying to shutdown report thread", e)
    --- End diff --
    
    Can `Thread.interrupt()` actually throw anything? The only declared exception is `SecurityException` and we don't have a security manager.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18179848
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -71,80 +74,134 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private val sparkContextRef = new AtomicReference[SparkContext](null)
     
       final def run(): Int = {
    -    val appAttemptId = client.getAttemptId()
    +    try {
    +      val appAttemptId = client.getAttemptId()
     
    -    if (isDriver) {
    -      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    -      // other spark processes running on the same box
    -      System.setProperty("spark.ui.port", "0")
    +      if (isDriver) {
    +        // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +        // other spark processes running on the same box
    +        System.setProperty("spark.ui.port", "0")
     
    -      // Set the master property to match the requested mode.
    -      System.setProperty("spark.master", "yarn-cluster")
    +        // Set the master property to match the requested mode.
    +        System.setProperty("spark.master", "yarn-cluster")
     
    -      // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    -      System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    -    }
    +        // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    +        System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    +      }
     
    -    logInfo("ApplicationAttemptId: " + appAttemptId)
    +      logInfo("ApplicationAttemptId: " + appAttemptId)
     
    -    val cleanupHook = new Runnable {
    -      override def run() {
    -        // If the SparkContext is still registered, shut it down as a best case effort in case
    -        // users do not call sc.stop or do System.exit().
    -        val sc = sparkContextRef.get()
    -        if (sc != null) {
    -          logInfo("Invoking sc stop from shutdown hook")
    -          sc.stop()
    -          finish(FinalApplicationStatus.SUCCEEDED)
    -        }
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          // If the SparkContext is still registered, shut it down as a best case effort in case
    +          // users do not call sc.stop or do System.exit().
    +          val sc = sparkContextRef.get()
    +          if (sc != null) {
    +            logInfo("Invoking sc stop from shutdown hook")
    +            sc.stop()
    +          }
    +          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +          val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +
    +          if (!finished) {
    +            // this shouldn't ever happen, but if it does assume weird failure
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +              "shutdown hook called without cleanly finishing")
    +          }
     
    -        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    -        // running the AM.
    -        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    -        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    -        if (finished || isLastAttempt) {
    -          cleanupStagingDir()
    +          if (!unregistered) {
    +            // we only want to unregister if we don't want the RM to retry
    +            if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
    +              unregister(finalStatus, finalMsg)
    +              cleanupStagingDir()
    +            }
    +          }
             }
           }
    -    }
     
    -    // Use higher priority than FileSystem.
    -    assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    -    ShutdownHookManager
    -      .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
    +      // Use higher priority than FileSystem.
    +      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    +      ShutdownHookManager
    +        .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
     
    -    // Call this to force generation of secret so it gets populated into the
    -    // Hadoop UGI. This has to happen before the startUserClass which does a
    -    // doAs in order for the credentials to be passed on to the executor containers.
    -    val securityMgr = new SecurityManager(sparkConf)
    +      // Call this to force generation of secret so it gets populated into the
    +      // Hadoop UGI. This has to happen before the startUserClass which does a
    +      // doAs in order for the credentials to be passed on to the executor containers.
    +      val securityMgr = new SecurityManager(sparkConf)
     
    -    if (isDriver) {
    -      runDriver(securityMgr)
    -    } else {
    -      runExecutorLauncher(securityMgr)
    +      if (isDriver) {
    +        runDriver(securityMgr)
    +      } else {
    +        runExecutorLauncher(securityMgr)
    +      }
    +    } catch {
    +      case e: Throwable => {
    +        // catch everything else if not specifically handled
    +        logError("Uncaught exception: ", e)
    +        finish(FinalApplicationStatus.FAILED,
    +          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +          "Uncaught exception: " + e.getMessage())
    +      }
         }
    +    exitCode
    +  }
     
    -    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    -      finish(finalStatus)
    -      0
    -    } else {
    -      1
    +  /**
    +   * unregister is used to completely unregister the application from the ResourceManager.
    +   * This means the ResourceManager will not retry the application attempt on your behalf if
    +   * a failure occurred.
    +   */
    +  final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!unregistered) {
    +      logInfo(s"Unregistering ApplicationMaster with $status" +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      unregistered = true
    +      client.unregister(status, Option(diagnostics).getOrElse(""))
         }
       }
     
    -  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +  final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized {
         if (!finished) {
    -      logInfo(s"Finishing ApplicationMaster with $status"  +
    -        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    -      finished = true
    +      logInfo(s"Final app status: ${status}, exitCode: ${code}" +
    +        Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
    +      exitCode = code
           finalStatus = status
    -      try {
    -        if (Thread.currentThread() != reporterThread) {
    +      finalMsg = msg
    +      finished = true
    +      if (Thread.currentThread() != reporterThread && Option(reporterThread).isDefined) {
    +        logDebug("shutting down reporter thread")
    +        try {
               reporterThread.interrupt()
    -          reporterThread.join()
    +        } catch {
    +          case e => {
    +            logError("Exception trying to shutdown report thread", e)
    --- End diff --
    
    ah good catch. I occasionally saw the ClosedByInterruptException when testing an earlier version of the code so had put this in being paranoid but I bet it was just the log statements from the other threads.    


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18181156
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -71,80 +74,134 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private val sparkContextRef = new AtomicReference[SparkContext](null)
     
       final def run(): Int = {
    -    val appAttemptId = client.getAttemptId()
    +    try {
    +      val appAttemptId = client.getAttemptId()
     
    -    if (isDriver) {
    -      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    -      // other spark processes running on the same box
    -      System.setProperty("spark.ui.port", "0")
    +      if (isDriver) {
    +        // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +        // other spark processes running on the same box
    +        System.setProperty("spark.ui.port", "0")
     
    -      // Set the master property to match the requested mode.
    -      System.setProperty("spark.master", "yarn-cluster")
    +        // Set the master property to match the requested mode.
    +        System.setProperty("spark.master", "yarn-cluster")
     
    -      // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    -      System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    -    }
    +        // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    +        System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    +      }
     
    -    logInfo("ApplicationAttemptId: " + appAttemptId)
    +      logInfo("ApplicationAttemptId: " + appAttemptId)
     
    -    val cleanupHook = new Runnable {
    -      override def run() {
    -        // If the SparkContext is still registered, shut it down as a best case effort in case
    -        // users do not call sc.stop or do System.exit().
    -        val sc = sparkContextRef.get()
    -        if (sc != null) {
    -          logInfo("Invoking sc stop from shutdown hook")
    -          sc.stop()
    -          finish(FinalApplicationStatus.SUCCEEDED)
    -        }
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          // If the SparkContext is still registered, shut it down as a best case effort in case
    +          // users do not call sc.stop or do System.exit().
    +          val sc = sparkContextRef.get()
    +          if (sc != null) {
    +            logInfo("Invoking sc stop from shutdown hook")
    +            sc.stop()
    +          }
    +          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +          val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +
    +          if (!finished) {
    +            // this shouldn't ever happen, but if it does assume weird failure
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +              "shutdown hook called without cleanly finishing")
    +          }
     
    -        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    -        // running the AM.
    -        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    -        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    -        if (finished || isLastAttempt) {
    -          cleanupStagingDir()
    +          if (!unregistered) {
    +            // we only want to unregister if we don't want the RM to retry
    +            if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
    +              unregister(finalStatus, finalMsg)
    +              cleanupStagingDir()
    +            }
    +          }
             }
           }
    -    }
     
    -    // Use higher priority than FileSystem.
    -    assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    -    ShutdownHookManager
    -      .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
    +      // Use higher priority than FileSystem.
    +      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    +      ShutdownHookManager
    +        .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
     
    -    // Call this to force generation of secret so it gets populated into the
    -    // Hadoop UGI. This has to happen before the startUserClass which does a
    -    // doAs in order for the credentials to be passed on to the executor containers.
    -    val securityMgr = new SecurityManager(sparkConf)
    +      // Call this to force generation of secret so it gets populated into the
    +      // Hadoop UGI. This has to happen before the startUserClass which does a
    +      // doAs in order for the credentials to be passed on to the executor containers.
    +      val securityMgr = new SecurityManager(sparkConf)
     
    -    if (isDriver) {
    -      runDriver(securityMgr)
    -    } else {
    -      runExecutorLauncher(securityMgr)
    +      if (isDriver) {
    +        runDriver(securityMgr)
    +      } else {
    +        runExecutorLauncher(securityMgr)
    +      }
    +    } catch {
    +      case e: Throwable => {
    +        // catch everything else if not specifically handled
    +        logError("Uncaught exception: ", e)
    +        finish(FinalApplicationStatus.FAILED,
    +          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +          "Uncaught exception: " + e.getMessage())
    +      }
         }
    +    exitCode
    +  }
     
    -    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    -      finish(finalStatus)
    -      0
    -    } else {
    -      1
    +  /**
    +   * unregister is used to completely unregister the application from the ResourceManager.
    +   * This means the ResourceManager will not retry the application attempt on your behalf if
    +   * a failure occurred.
    +   */
    +  final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!unregistered) {
    +      logInfo(s"Unregistering ApplicationMaster with $status" +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      unregistered = true
    +      client.unregister(status, Option(diagnostics).getOrElse(""))
         }
       }
     
    -  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +  final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized {
         if (!finished) {
    -      logInfo(s"Finishing ApplicationMaster with $status"  +
    -        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    -      finished = true
    +      logInfo(s"Final app status: ${status}, exitCode: ${code}" +
    +        Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
    +      exitCode = code
           finalStatus = status
    -      try {
    -        if (Thread.currentThread() != reporterThread) {
    +      finalMsg = msg
    +      finished = true
    +      if (Thread.currentThread() != reporterThread && Option(reporterThread).isDefined) {
    +        logDebug("shutting down reporter thread")
    +        try {
               reporterThread.interrupt()
    -          reporterThread.join()
    +        } catch {
    +          case e => {
    +            logError("Exception trying to shutdown report thread", e)
    +            // since main thread (client mode) could be waiting on this to finish
    +            // just exit here
    +            System.exit(ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION)
    +          }
    +        }
    +      }
    +      if (Thread.currentThread() != userClassThread && Option(userClassThread).isDefined) {
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from finish")
    +          sc.stop()
    --- End diff --
    
    I'm feeling a little bit weird about this call.
    
    Feels to me like it would be better to do it after the user thread is interrupted and the user thread stops. And since we already have a shutdown hook that takes care of calling it if the user code doesn't, that it's already handled.
    
    Is there a particular case you're thinking about here that is not covered by the current code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18418163
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -231,33 +259,26 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
         val t = new Thread {
           override def run() {
             var failureCount = 0
    -
             while (!finished) {
               try {
    -            checkNumExecutorsFailed()
    -            if (!finished) {
    +            if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
    +              finish(FinalApplicationStatus.FAILED,
    +                ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
    +                "Max number of executor failures reached")
    +            } else {
                   logDebug("Sending progress")
                   allocator.allocateResources()
                 }
                 failureCount = 0
               } catch {
    +            case i: InterruptedException =>
                 case e: Throwable => {
                   failureCount += 1
                   if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
    --- End diff --
    
    Since we're catching `InterruptedException` here it's always gonna be `NonFatal` right? I think this check is now outdated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18174887
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -383,40 +432,80 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
         }
       }
     
    +  /**
    +   * Start the user class, which contains the spark driver.
    +   * If the main routine exits cleanly or exits with System.exit(0) we
    +   * assume it was successful, for all other cases we assume failure.
    +   */
       private def startUserClass(): Thread = {
         logInfo("Starting the user JAR in a separate Thread")
         System.setProperty("spark.executor.instances", args.numExecutors.toString)
    +    var stopped = false
         val mainMethod = Class.forName(args.userClass, false,
           Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
     
    -    userClassThread = new Thread {
    +    val userThread = new Thread {
           override def run() {
    -        var status = FinalApplicationStatus.FAILED
    +
    +        try {
    +          // Note this security manager applies to the entire process, not
    +          // just this thread. Its here to handle the case if the user code
    --- End diff --
    
    nit: It's


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57818934
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21250/consoleFull) for   PR 2577 at commit [`e8cc261`](https://github.com/apache/spark/commit/e8cc261ba8e8b639d2fd375638ae5bb0925c1411).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57314584
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21040/consoleFull) for   PR 2577 at commit [`fab166d`](https://github.com/apache/spark/commit/fab166dba852f732049dda112daab17a491dd94c).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57325361
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21040/consoleFull) for   PR 2577 at commit [`fab166d`](https://github.com/apache/spark/commit/fab166dba852f732049dda112daab17a491dd94c).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18182939
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -71,80 +74,134 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private val sparkContextRef = new AtomicReference[SparkContext](null)
     
       final def run(): Int = {
    -    val appAttemptId = client.getAttemptId()
    +    try {
    +      val appAttemptId = client.getAttemptId()
     
    -    if (isDriver) {
    -      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    -      // other spark processes running on the same box
    -      System.setProperty("spark.ui.port", "0")
    +      if (isDriver) {
    +        // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +        // other spark processes running on the same box
    +        System.setProperty("spark.ui.port", "0")
     
    -      // Set the master property to match the requested mode.
    -      System.setProperty("spark.master", "yarn-cluster")
    +        // Set the master property to match the requested mode.
    +        System.setProperty("spark.master", "yarn-cluster")
     
    -      // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    -      System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    -    }
    +        // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    +        System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    +      }
     
    -    logInfo("ApplicationAttemptId: " + appAttemptId)
    +      logInfo("ApplicationAttemptId: " + appAttemptId)
     
    -    val cleanupHook = new Runnable {
    -      override def run() {
    -        // If the SparkContext is still registered, shut it down as a best case effort in case
    -        // users do not call sc.stop or do System.exit().
    -        val sc = sparkContextRef.get()
    -        if (sc != null) {
    -          logInfo("Invoking sc stop from shutdown hook")
    -          sc.stop()
    -          finish(FinalApplicationStatus.SUCCEEDED)
    -        }
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          // If the SparkContext is still registered, shut it down as a best case effort in case
    +          // users do not call sc.stop or do System.exit().
    +          val sc = sparkContextRef.get()
    +          if (sc != null) {
    +            logInfo("Invoking sc stop from shutdown hook")
    +            sc.stop()
    +          }
    +          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +          val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +
    +          if (!finished) {
    +            // this shouldn't ever happen, but if it does assume weird failure
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +              "shutdown hook called without cleanly finishing")
    +          }
     
    -        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    -        // running the AM.
    -        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    -        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    -        if (finished || isLastAttempt) {
    -          cleanupStagingDir()
    +          if (!unregistered) {
    +            // we only want to unregister if we don't want the RM to retry
    +            if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
    +              unregister(finalStatus, finalMsg)
    +              cleanupStagingDir()
    +            }
    +          }
             }
           }
    -    }
     
    -    // Use higher priority than FileSystem.
    -    assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    -    ShutdownHookManager
    -      .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
    +      // Use higher priority than FileSystem.
    +      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    +      ShutdownHookManager
    +        .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
     
    -    // Call this to force generation of secret so it gets populated into the
    -    // Hadoop UGI. This has to happen before the startUserClass which does a
    -    // doAs in order for the credentials to be passed on to the executor containers.
    -    val securityMgr = new SecurityManager(sparkConf)
    +      // Call this to force generation of secret so it gets populated into the
    +      // Hadoop UGI. This has to happen before the startUserClass which does a
    +      // doAs in order for the credentials to be passed on to the executor containers.
    +      val securityMgr = new SecurityManager(sparkConf)
     
    -    if (isDriver) {
    -      runDriver(securityMgr)
    -    } else {
    -      runExecutorLauncher(securityMgr)
    +      if (isDriver) {
    +        runDriver(securityMgr)
    +      } else {
    +        runExecutorLauncher(securityMgr)
    +      }
    +    } catch {
    +      case e: Throwable => {
    +        // catch everything else if not specifically handled
    +        logError("Uncaught exception: ", e)
    +        finish(FinalApplicationStatus.FAILED,
    +          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +          "Uncaught exception: " + e.getMessage())
    +      }
         }
    +    exitCode
    +  }
     
    -    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    -      finish(finalStatus)
    -      0
    -    } else {
    -      1
    +  /**
    +   * unregister is used to completely unregister the application from the ResourceManager.
    +   * This means the ResourceManager will not retry the application attempt on your behalf if
    +   * a failure occurred.
    +   */
    +  final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!unregistered) {
    +      logInfo(s"Unregistering ApplicationMaster with $status" +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      unregistered = true
    +      client.unregister(status, Option(diagnostics).getOrElse(""))
         }
       }
     
    -  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +  final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized {
         if (!finished) {
    -      logInfo(s"Finishing ApplicationMaster with $status"  +
    -        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    -      finished = true
    +      logInfo(s"Final app status: ${status}, exitCode: ${code}" +
    +        Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
    +      exitCode = code
           finalStatus = status
    -      try {
    -        if (Thread.currentThread() != reporterThread) {
    +      finalMsg = msg
    +      finished = true
    +      if (Thread.currentThread() != reporterThread && Option(reporterThread).isDefined) {
    +        logDebug("shutting down reporter thread")
    +        try {
               reporterThread.interrupt()
    -          reporterThread.join()
    +        } catch {
    +          case e => {
    +            logError("Exception trying to shutdown report thread", e)
    +            // since main thread (client mode) could be waiting on this to finish
    +            // just exit here
    +            System.exit(ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION)
    +          }
    +        }
    +      }
    +      if (Thread.currentThread() != userClassThread && Option(userClassThread).isDefined) {
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from finish")
    +          sc.stop()
    --- End diff --
    
    I was thinking it would be nicer (as far as like cleanup and such) to do the sc.stop() before the interrupt, in case the interrupt didn't end up behind handled nicely.  Note that under normal exit situations this wouldn't be invoked here.  Its when something else goes wrong (like max executor failures, etc).  
     Is there some condition you know its bad to call it?
    I'll do a few more tests on it to see what happens in both cases.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18180627
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -71,80 +74,134 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       private val sparkContextRef = new AtomicReference[SparkContext](null)
     
       final def run(): Int = {
    -    val appAttemptId = client.getAttemptId()
    +    try {
    +      val appAttemptId = client.getAttemptId()
     
    -    if (isDriver) {
    -      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    -      // other spark processes running on the same box
    -      System.setProperty("spark.ui.port", "0")
    +      if (isDriver) {
    +        // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +        // other spark processes running on the same box
    +        System.setProperty("spark.ui.port", "0")
     
    -      // Set the master property to match the requested mode.
    -      System.setProperty("spark.master", "yarn-cluster")
    +        // Set the master property to match the requested mode.
    +        System.setProperty("spark.master", "yarn-cluster")
     
    -      // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    -      System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    -    }
    +        // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
    +        System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
    +      }
     
    -    logInfo("ApplicationAttemptId: " + appAttemptId)
    +      logInfo("ApplicationAttemptId: " + appAttemptId)
     
    -    val cleanupHook = new Runnable {
    -      override def run() {
    -        // If the SparkContext is still registered, shut it down as a best case effort in case
    -        // users do not call sc.stop or do System.exit().
    -        val sc = sparkContextRef.get()
    -        if (sc != null) {
    -          logInfo("Invoking sc stop from shutdown hook")
    -          sc.stop()
    -          finish(FinalApplicationStatus.SUCCEEDED)
    -        }
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          // If the SparkContext is still registered, shut it down as a best case effort in case
    +          // users do not call sc.stop or do System.exit().
    +          val sc = sparkContextRef.get()
    +          if (sc != null) {
    +            logInfo("Invoking sc stop from shutdown hook")
    +            sc.stop()
    +          }
    +          val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +          val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +
    +          if (!finished) {
    +            // this shouldn't ever happen, but if it does assume weird failure
    +            finish(FinalApplicationStatus.FAILED,
    +              ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +              "shutdown hook called without cleanly finishing")
    +          }
     
    -        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    -        // running the AM.
    -        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    -        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    -        if (finished || isLastAttempt) {
    -          cleanupStagingDir()
    +          if (!unregistered) {
    +            // we only want to unregister if we don't want the RM to retry
    +            if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
    +              unregister(finalStatus, finalMsg)
    +              cleanupStagingDir()
    +            }
    +          }
             }
           }
    -    }
     
    -    // Use higher priority than FileSystem.
    -    assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    -    ShutdownHookManager
    -      .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
    +      // Use higher priority than FileSystem.
    +      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
    +      ShutdownHookManager
    +        .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
     
    -    // Call this to force generation of secret so it gets populated into the
    -    // Hadoop UGI. This has to happen before the startUserClass which does a
    -    // doAs in order for the credentials to be passed on to the executor containers.
    -    val securityMgr = new SecurityManager(sparkConf)
    +      // Call this to force generation of secret so it gets populated into the
    +      // Hadoop UGI. This has to happen before the startUserClass which does a
    +      // doAs in order for the credentials to be passed on to the executor containers.
    +      val securityMgr = new SecurityManager(sparkConf)
     
    -    if (isDriver) {
    -      runDriver(securityMgr)
    -    } else {
    -      runExecutorLauncher(securityMgr)
    +      if (isDriver) {
    +        runDriver(securityMgr)
    +      } else {
    +        runExecutorLauncher(securityMgr)
    +      }
    +    } catch {
    +      case e: Throwable => {
    +        // catch everything else if not specifically handled
    +        logError("Uncaught exception: ", e)
    +        finish(FinalApplicationStatus.FAILED,
    +          ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
    +          "Uncaught exception: " + e.getMessage())
    +      }
         }
    +    exitCode
    +  }
     
    -    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    -      finish(finalStatus)
    -      0
    -    } else {
    -      1
    +  /**
    +   * unregister is used to completely unregister the application from the ResourceManager.
    +   * This means the ResourceManager will not retry the application attempt on your behalf if
    +   * a failure occurred.
    +   */
    +  final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!unregistered) {
    +      logInfo(s"Unregistering ApplicationMaster with $status" +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      unregistered = true
    +      client.unregister(status, Option(diagnostics).getOrElse(""))
         }
       }
     
    -  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +  final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized {
         if (!finished) {
    -      logInfo(s"Finishing ApplicationMaster with $status"  +
    -        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    -      finished = true
    +      logInfo(s"Final app status: ${status}, exitCode: ${code}" +
    +        Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
    +      exitCode = code
           finalStatus = status
    -      try {
    -        if (Thread.currentThread() != reporterThread) {
    +      finalMsg = msg
    +      finished = true
    +      if (Thread.currentThread() != reporterThread && Option(reporterThread).isDefined) {
    +        logDebug("shutting down reporter thread")
    +        try {
               reporterThread.interrupt()
    -          reporterThread.join()
    +        } catch {
    +          case e => {
    +            logError("Exception trying to shutdown report thread", e)
    +            // since main thread (client mode) could be waiting on this to finish
    +            // just exit here
    +            System.exit(ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION)
    +          }
    +        }
    +      }
    +      if (Thread.currentThread() != userClassThread && Option(userClassThread).isDefined) {
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from finish")
    +          sc.stop()
    +        }
    +        logDebug("shutting down user thread")
    +        try {
    +          // since we don't know what the user thread is doing at the time
    +          // of interrupt catch exception so we still exit. For instance
    +          // we could get a java.nio.channels.ClosedByInterruptException.
    +          userClassThread.interrupt()
    +        } catch {
    +          case e => {
    +            logError("Exception trying to shutdown user thread", e)
    --- End diff --
    
    I figured it didn't hurt anything to interrupt it if its not alive.  I can add the checks though if you would like.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2577#discussion_r18173150
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -450,6 +539,15 @@ object ApplicationMaster extends Logging {
     
       val SHUTDOWN_HOOK_PRIORITY: Int = 30
     
    +  // exit codes for different causes, no reason behind the values
    --- End diff --
    
    The application Master is not an executor so I chose not to use it. It also doesn't have the same exit reasons which could be useful if the user has an exit code and wants to know what that matches up to


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57313993
  
    thanks for the review @vanzin.  I've updated it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3627] - [yarn] - fix exit code and fina...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2577#issuecomment-57809815
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21250/consoleFull) for   PR 2577 at commit [`e8cc261`](https://github.com/apache/spark/commit/e8cc261ba8e8b639d2fd375638ae5bb0925c1411).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org