You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2014/09/23 18:40:36 UTC
git commit: [SPARK-3304] [YARN] ApplicationMaster's Finish status is
wrong when uncaught exception is thrown from ReporterThread
Repository: spark
Updated Branches:
refs/heads/master c4022dd52 -> 11c10df82
[SPARK-3304] [YARN] ApplicationMaster's Finish status is wrong when uncaught exception is thrown from ReporterThread
Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>
Closes #2198 from sarutak/SPARK-3304 and squashes the following commits:
2696237 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
5b80363 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
4eb0a3e [Kousuke Saruta] Remoed the description about spark.yarn.scheduler.reporterThread.maxFailure
9741597 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
f7538d4 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
358ef8d [Kousuke Saruta] Merge branch 'SPARK-3304' of github.com:sarutak/spark into SPARK-3304
0d138c6 [Kousuke Saruta] Revert "tmp"
f8da10a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
b6e9879 [Kousuke Saruta] tmp
8d256ed [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
13b2652 [Kousuke Saruta] Merge branch 'SPARK-3304' of github.com:sarutak/spark into SPARK-3304
2711e15 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
c081f8e [Kousuke Saruta] Modified ApplicationMaster to handle exception in ReporterThread itself
0bbd3a6 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
a6982ad [Kousuke Saruta] Added ability handling uncaught exception thrown from Reporter thread
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/11c10df8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/11c10df8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/11c10df8
Branch: refs/heads/master
Commit: 11c10df825419372df61a8d23c51e8c3cc78047f
Parents: c4022dd
Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>
Authored: Tue Sep 23 11:40:14 2014 -0500
Committer: Thomas Graves <tg...@apache.org>
Committed: Tue Sep 23 11:40:14 2014 -0500
----------------------------------------------------------------------
.../spark/deploy/yarn/ApplicationMaster.scala | 66 ++++++++++++++++----
1 file changed, 54 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/11c10df8/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index cde5fff..9050808 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -17,7 +17,10 @@
package org.apache.spark.deploy.yarn
+import scala.util.control.NonFatal
+
import java.io.IOException
+import java.lang.reflect.InvocationTargetException
import java.net.Socket
import java.util.concurrent.atomic.AtomicReference
@@ -55,6 +58,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
@volatile private var finished = false
@volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
+ @volatile private var userClassThread: Thread = _
private var reporterThread: Thread = _
private var allocator: YarnAllocator = _
@@ -221,18 +225,48 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// must be <= expiryInterval / 2.
val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
+ // The number of failures in a row until Reporter thread give up
+ val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
+
val t = new Thread {
override def run() {
+ var failureCount = 0
+
while (!finished) {
- checkNumExecutorsFailed()
- if (!finished) {
- logDebug("Sending progress")
- allocator.allocateResources()
- try {
- Thread.sleep(interval)
- } catch {
- case e: InterruptedException =>
+ try {
+ checkNumExecutorsFailed()
+ if (!finished) {
+ logDebug("Sending progress")
+ allocator.allocateResources()
}
+ failureCount = 0
+ } catch {
+ case e: Throwable => {
+ failureCount += 1
+ if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
+ logError("Exception was thrown from Reporter thread.", e)
+ finish(FinalApplicationStatus.FAILED, "Exception was thrown" +
+ s"${failureCount} time(s) from Reporter thread.")
+
+ /**
+ * If exception is thrown from ReporterThread,
+ * interrupt user class to stop.
+ * Without this interrupting, if exception is
+ * thrown before allocating enough executors,
+ * YarnClusterScheduler waits until timeout even though
+ * we cannot allocate executors.
+ */
+ logInfo("Interrupting user class to stop.")
+ userClassThread.interrupt
+ } else {
+ logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e)
+ }
+ }
+ }
+ try {
+ Thread.sleep(interval)
+ } catch {
+ case e: InterruptedException =>
}
}
}
@@ -355,7 +389,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
val mainMethod = Class.forName(args.userClass, false,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
- val t = new Thread {
+ userClassThread = new Thread {
override def run() {
var status = FinalApplicationStatus.FAILED
try {
@@ -366,15 +400,23 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// Some apps have "System.exit(0)" at the end. The user thread will stop here unless
// it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED.
status = FinalApplicationStatus.SUCCEEDED
+ } catch {
+ case e: InvocationTargetException => {
+ e.getCause match {
+ case _: InterruptedException => {
+ // Reporter thread can interrupt to stop user class
+ }
+ }
+ }
} finally {
logDebug("Finishing main")
}
finalStatus = status
}
}
- t.setName("Driver")
- t.start()
- t
+ userClassThread.setName("Driver")
+ userClassThread.start()
+ userClassThread
}
// Actor used to monitor the driver when running in client deploy mode.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org