You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/03/20 01:02:12 UTC

spark git commit: [SPARK-23660] Fix exception in yarn cluster mode when application ended fast

Repository: spark
Updated Branches:
  refs/heads/master f15906da1 -> 5f4deff19


[SPARK-23660] Fix exception in yarn cluster mode when application ended fast

## What changes were proposed in this pull request?

Yarn throws the following exception in cluster mode when the application is really small:

```
18/03/07 23:34:22 WARN netty.NettyRpcEnv: Ignored failure: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask7c974942 rejected from java.util.concurrent.ScheduledThreadPoolExecutor1eea9d2d[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
18/03/07 23:34:22 ERROR yarn.ApplicationMaster: Uncaught exception:
org.apache.spark.SparkException: Exception thrown in awaitResult:
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
	at org.apache.spark.deploy.yarn.YarnAllocator.<init>(YarnAllocator.scala:102)
	at org.apache.spark.deploy.yarn.YarnRMClient.register(YarnRMClient.scala:77)
	at org.apache.spark.deploy.yarn.ApplicationMaster.registerAM(ApplicationMaster.scala:450)
	at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:493)
	at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:345)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply$mcV$sp(ApplicationMaster.scala:260)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$5.run(ApplicationMaster.scala:810)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
	at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:809)
	at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:259)
	at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:834)
	at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Caused by: org.apache.spark.rpc.RpcEnvStoppedException: RpcEnv already stopped.
	at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:158)
	at org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:135)
	at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:229)
	at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91)
	... 17 more
18/03/07 23:34:22 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: Uncaught exception: org.apache.spark.SparkException: Exception thrown in awaitResult: )
```

Example application:

```
object ExampleApp {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ExampleApp")
    val sc = new SparkContext(conf)
    try {
      // Do nothing
    } finally {
      sc.stop()
    }
  }
```

This PR pauses user class thread after `SparkContext` created and keeps it so until application master initialises properly.

## How was this patch tested?

Automated: Existing unit tests
Manual: Application submitted into small cluster

Author: Gabor Somogyi <ga...@gmail.com>

Closes #20807 from gaborgsomogyi/SPARK-23660.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f4deff1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f4deff1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f4deff1

Branch: refs/heads/master
Commit: 5f4deff19511b6870f056eba5489104b9cac05a9
Parents: f15906d
Author: Gabor Somogyi <ga...@gmail.com>
Authored: Mon Mar 19 18:02:04 2018 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Mon Mar 19 18:02:04 2018 -0700

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala      | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5f4deff1/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 2f88feb..6e35d23 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -418,7 +418,19 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
   }
 
   private def sparkContextInitialized(sc: SparkContext) = {
-    sparkContextPromise.success(sc)
+    sparkContextPromise.synchronized {
+      // Notify runDriver function that SparkContext is available
+      sparkContextPromise.success(sc)
+      // Pause the user class thread in order to make proper initialization in runDriver function.
+      sparkContextPromise.wait()
+    }
+  }
+
+  private def resumeDriver(): Unit = {
+    // When initialization in runDriver happened the user class thread has to be resumed.
+    sparkContextPromise.synchronized {
+      sparkContextPromise.notify()
+    }
   }
 
   private def registerAM(
@@ -497,6 +509,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
         // if the user app did not create a SparkContext.
         throw new IllegalStateException("User did not initialize spark context!")
       }
+      resumeDriver()
       userClassThread.join()
     } catch {
       case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
@@ -506,6 +519,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
         finish(FinalApplicationStatus.FAILED,
           ApplicationMaster.EXIT_SC_NOT_INITED,
           "Timed out waiting for SparkContext.")
+    } finally {
+      resumeDriver()
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org