You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by vanzin <gi...@git.apache.org> on 2018/02/01 23:21:21 UTC

[GitHub] spark pull request #19616: [SPARK-22404][YARN] Provide an option to use unma...

Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19616#discussion_r165518845
  
    --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---
    @@ -1104,14 +1117,39 @@ private[spark] class Client(
           if (returnOnRunning && state == YarnApplicationState.RUNNING) {
             return (state, report.getFinalApplicationStatus)
           }
    -
    +      if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled
    +        && !amServiceStarted && report.getAMRMToken != null) {
    +        amServiceStarted = true
    +        startApplicationMasterService(report)
    +      }
           lastState = state
         }
     
         // Never reached, but keeps compiler happy
         throw new SparkException("While loop is depleted! This should never happen...")
       }
     
    +  private def startApplicationMasterService(report: ApplicationReport) = {
    +    // Add AMRMToken to establish connection between RM and AM
    +    val token = report.getAMRMToken
    +    val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] =
    +      new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token
    +        .getIdentifier().array(), token.getPassword().array, new Text(
    +        token.getKind()), new Text(token.getService()))
    +    val currentUGI = UserGroupInformation.getCurrentUser
    +    currentUGI.addToken(amRMToken)
    +
    +    System.setProperty(
    +      ApplicationConstants.Environment.CONTAINER_ID.name(),
    +      ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 1).toString)
    +    val amArgs = new ApplicationMasterArguments(Array("--arg",
    +      sparkConf.get("spark.driver.host") + ":" + sparkConf.get("spark.driver.port")))
    +    // Start Application Service in a separate thread and continue with application monitoring
    +    new Thread() {
    --- End diff --
    
    Don't you want to keep a reference to this thread and join it at some point, to make sure it really goes away? Should it be a daemon thread instead?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org