You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2018/12/14 20:34:38 UTC

[GitHub] devaraj-kavali commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode

devaraj-kavali commented on a change in pull request #19616: [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode
URL: https://github.com/apache/spark/pull/19616#discussion_r241883955
 
 

 ##########
 File path: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 ##########
 @@ -1084,14 +1095,38 @@ private[spark] class Client(
       if (returnOnRunning && state == YarnApplicationState.RUNNING) {
         return createAppReport(report)
       }
-
+      if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled
+        && !amServiceStarted && report.getAMRMToken != null) {
+        amServiceStarted = true
+        startApplicationMasterService(report)
+      }
       lastState = state
     }
 
     // Never reached, but keeps compiler happy
     throw new SparkException("While loop is depleted! This should never happen...")
   }
 
+  private def startApplicationMasterService(report: ApplicationReport) = {
+    // Add AMRMToken to establish connection between RM and AM
+    val token = report.getAMRMToken
+    val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] =
+      new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token
+        .getIdentifier().array(), token.getPassword().array, new Text(
+        token.getKind()), new Text(token.getService()))
+    val currentUGI = UserGroupInformation.getCurrentUser
+    currentUGI.addToken(amRMToken)
+
+    sparkConf.set("spark.yarn.containerId",
+      ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 1).toString)
+    // Start Application Service in a separate thread and continue with application monitoring
+    val amService = new Thread() {
 
 Review comment:
   added thread name

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org