You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/09/18 07:47:18 UTC

[spark] branch branch-3.0 updated: [SPARK-32905][CORE][YARN] ApplicationMaster fails to receive UpdateDelegationTokens message

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new ffcd757  [SPARK-32905][CORE][YARN] ApplicationMaster fails to receive UpdateDelegationTokens message
ffcd757 is described below

commit ffcd757cf54583dfd589b42e87897fe8a255077f
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Fri Sep 18 07:41:21 2020 +0000

    [SPARK-32905][CORE][YARN] ApplicationMaster fails to receive UpdateDelegationTokens message
    
    ### What changes were proposed in this pull request?
    
    With a long-running application in kerberized mode, the AMEndpiont handles `UpdateDelegationTokens` message wrong, which is an OneWayMessage that should be handled in the `receive` function.
    
    ```java
    20-09-15 18:53:01 INFO yarn.YarnAllocator: Received 22 containers from YARN, launching executors on 0 of them.
    20-09-16 12:52:28 ERROR netty.Inbox: Ignoring error
    org.apache.spark.SparkException: NettyRpcEndpointRef(spark-client://YarnAM) does not implement 'receive'
    	at org.apache.spark.rpc.RpcEndpoint$$anonfun$receive$1.applyOrElse(RpcEndpoint.scala:70)
    	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203)
    	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    	at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    	at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    20-09-17 06:52:28 ERROR netty.Inbox: Ignoring error
    org.apache.spark.SparkException: NettyRpcEndpointRef(spark-client://YarnAM) does not implement 'receive'
    	at org.apache.spark.rpc.RpcEndpoint$$anonfun$receive$1.applyOrElse(RpcEndpoint.scala:70)
    	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203)
    	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    	at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    	at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    ```
    
    ### Why are the changes needed?
    
    bugfix, without a proper token refresher, the long-running apps are going to fail potentially in kerberized cluster
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    Passing jenkins
    
    and verify manually
    
    I am running the sub-module `kyuubi-spark-sql-engine` of https://github.com/yaooqinn/kyuubi
    
    The simplest way to reproduce the bug and verify this fix is to follow these steps
    
    #### 1 build the `kyuubi-spark-sql-engine` module
    ```
    mvn clean package -pl :kyuubi-spark-sql-engine
    ```
    #### 2. config the spark with Kerberos settings towards your secured cluster
    
    #### 3. start it in the background
    ```
    nohup bin/spark-submit --class org.apache.kyuubi.engine.spark.SparkSQLEngine ../kyuubi-spark-sql-engine-1.0.0-SNAPSHOT.jar > kyuubi.log &
    ```
    
    #### 4. check the AM log and see
    
    "Updating delegation tokens ..." for SUCCESS
    
    "Inbox: Ignoring error ...... does not implement 'receive'" for FAILURE
    
    Closes #29777 from yaooqinn/SPARK-32905.
    
    Authored-by: Kent Yao <ya...@hotmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 9e9d4b6994a29fb139fd50d24b5418a900c7f072)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala    | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 862acd8..99efa5c 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -775,6 +775,11 @@ private[spark] class ApplicationMaster(
       driver.send(RegisterClusterManager(self))
     }
 
+    override def receive: PartialFunction[Any, Unit] = {
+      case UpdateDelegationTokens(tokens) =>
+        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
+    }
+
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
       case r: RequestExecutors =>
         Option(allocator) match {
@@ -806,9 +811,6 @@ private[spark] class ApplicationMaster(
           case None =>
             logWarning("Container allocator is not ready to find executor loss reasons yet.")
         }
-
-      case UpdateDelegationTokens(tokens) =>
-        SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
     }
 
     override def onDisconnected(remoteAddress: RpcAddress): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org