You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/03/20 01:02:12 UTC
spark git commit: [SPARK-23660] Fix exception in yarn cluster mode
when application ended fast
Repository: spark
Updated Branches:
refs/heads/master f15906da1 -> 5f4deff19
[SPARK-23660] Fix exception in yarn cluster mode when application ended fast
## What changes were proposed in this pull request?
Yarn throws the following exception in cluster mode when the application is really small:
```
18/03/07 23:34:22 WARN netty.NettyRpcEnv: Ignored failure: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask7c974942 rejected from java.util.concurrent.ScheduledThreadPoolExecutor1eea9d2d[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
18/03/07 23:34:22 ERROR yarn.ApplicationMaster: Uncaught exception:
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
at org.apache.spark.deploy.yarn.YarnAllocator.<init>(YarnAllocator.scala:102)
at org.apache.spark.deploy.yarn.YarnRMClient.register(YarnRMClient.scala:77)
at org.apache.spark.deploy.yarn.ApplicationMaster.registerAM(ApplicationMaster.scala:450)
at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:493)
at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:345)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply$mcV$sp(ApplicationMaster.scala:260)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$2.apply(ApplicationMaster.scala:260)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$5.run(ApplicationMaster.scala:810)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:809)
at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:259)
at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:834)
at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Caused by: org.apache.spark.rpc.RpcEnvStoppedException: RpcEnv already stopped.
at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:158)
at org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:135)
at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:229)
at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91)
... 17 more
18/03/07 23:34:22 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: Uncaught exception: org.apache.spark.SparkException: Exception thrown in awaitResult: )
```
Example application:
```
object ExampleApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ExampleApp")
val sc = new SparkContext(conf)
try {
// Do nothing
} finally {
sc.stop()
}
}
```
This PR pauses user class thread after `SparkContext` created and keeps it so until application master initialises properly.
## How was this patch tested?
Automated: Existing unit tests
Manual: Application submitted into small cluster
Author: Gabor Somogyi <ga...@gmail.com>
Closes #20807 from gaborgsomogyi/SPARK-23660.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f4deff1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f4deff1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f4deff1
Branch: refs/heads/master
Commit: 5f4deff19511b6870f056eba5489104b9cac05a9
Parents: f15906d
Author: Gabor Somogyi <ga...@gmail.com>
Authored: Mon Mar 19 18:02:04 2018 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Mon Mar 19 18:02:04 2018 -0700
----------------------------------------------------------------------
.../spark/deploy/yarn/ApplicationMaster.scala | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/5f4deff1/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 2f88feb..6e35d23 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -418,7 +418,19 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
}
private def sparkContextInitialized(sc: SparkContext) = {
- sparkContextPromise.success(sc)
+ sparkContextPromise.synchronized {
+ // Notify runDriver function that SparkContext is available
+ sparkContextPromise.success(sc)
+ // Pause the user class thread in order to make proper initialization in runDriver function.
+ sparkContextPromise.wait()
+ }
+ }
+
+ private def resumeDriver(): Unit = {
+ // When initialization in runDriver happened the user class thread has to be resumed.
+ sparkContextPromise.synchronized {
+ sparkContextPromise.notify()
+ }
}
private def registerAM(
@@ -497,6 +509,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
// if the user app did not create a SparkContext.
throw new IllegalStateException("User did not initialize spark context!")
}
+ resumeDriver()
userClassThread.join()
} catch {
case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
@@ -506,6 +519,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
+ } finally {
+ resumeDriver()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org