You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2014/09/23 18:40:36 UTC

git commit: [SPARK-3304] [YARN] ApplicationMaster's Finish status is wrong when uncaught exception is thrown from ReporterThread

Repository: spark
Updated Branches:
  refs/heads/master c4022dd52 -> 11c10df82


[SPARK-3304] [YARN] ApplicationMaster's Finish status is wrong when uncaught exception is thrown from ReporterThread

Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>

Closes #2198 from sarutak/SPARK-3304 and squashes the following commits:

2696237 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
5b80363 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
4eb0a3e [Kousuke Saruta] Remoed the description about spark.yarn.scheduler.reporterThread.maxFailure
9741597 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
f7538d4 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
358ef8d [Kousuke Saruta] Merge branch 'SPARK-3304' of github.com:sarutak/spark into SPARK-3304
0d138c6 [Kousuke Saruta] Revert "tmp"
f8da10a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
b6e9879 [Kousuke Saruta] tmp
8d256ed [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
13b2652 [Kousuke Saruta] Merge branch 'SPARK-3304' of github.com:sarutak/spark into SPARK-3304
2711e15 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
c081f8e [Kousuke Saruta] Modified ApplicationMaster to handle exception in ReporterThread itself
0bbd3a6 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3304
a6982ad [Kousuke Saruta] Added ability handling uncaught exception thrown from Reporter thread


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/11c10df8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/11c10df8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/11c10df8

Branch: refs/heads/master
Commit: 11c10df825419372df61a8d23c51e8c3cc78047f
Parents: c4022dd
Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>
Authored: Tue Sep 23 11:40:14 2014 -0500
Committer: Thomas Graves <tg...@apache.org>
Committed: Tue Sep 23 11:40:14 2014 -0500

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   | 66 ++++++++++++++++----
 1 file changed, 54 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/11c10df8/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index cde5fff..9050808 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark.deploy.yarn
 
+import scala.util.control.NonFatal
+
 import java.io.IOException
+import java.lang.reflect.InvocationTargetException
 import java.net.Socket
 import java.util.concurrent.atomic.AtomicReference
 
@@ -55,6 +58,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
 
   @volatile private var finished = false
   @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
+  @volatile private var userClassThread: Thread = _
 
   private var reporterThread: Thread = _
   private var allocator: YarnAllocator = _
@@ -221,18 +225,48 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
     // must be <= expiryInterval / 2.
     val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
 
+    // The number of failures in a row until Reporter thread give up
+    val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
+
     val t = new Thread {
       override def run() {
+        var failureCount = 0
+
         while (!finished) {
-          checkNumExecutorsFailed()
-          if (!finished) {
-            logDebug("Sending progress")
-            allocator.allocateResources()
-            try {
-              Thread.sleep(interval)
-            } catch {
-              case e: InterruptedException =>
+          try {
+            checkNumExecutorsFailed()
+            if (!finished) {
+              logDebug("Sending progress")
+              allocator.allocateResources()
             }
+            failureCount = 0
+          } catch {
+            case e: Throwable => {
+              failureCount += 1
+              if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
+                logError("Exception was thrown from Reporter thread.", e)
+                finish(FinalApplicationStatus.FAILED, "Exception was thrown" +
+                  s"${failureCount} time(s) from Reporter thread.")
+
+                /**
+                 * If exception is thrown from ReporterThread,
+                 * interrupt user class to stop.
+                 * Without this interrupting, if exception is
+                 * thrown before allocating enough executors,
+                 * YarnClusterScheduler waits until timeout even though
+                 * we cannot allocate executors.
+                 */
+                logInfo("Interrupting user class to stop.")
+                userClassThread.interrupt
+              } else {
+                logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e)
+              }
+            }
+          }
+          try {
+            Thread.sleep(interval)
+          } catch {
+            case e: InterruptedException =>
           }
         }
       }
@@ -355,7 +389,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
     val mainMethod = Class.forName(args.userClass, false,
       Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
 
-    val t = new Thread {
+    userClassThread = new Thread {
       override def run() {
         var status = FinalApplicationStatus.FAILED
         try {
@@ -366,15 +400,23 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
           // Some apps have "System.exit(0)" at the end.  The user thread will stop here unless
           // it has an uncaught exception thrown out.  It needs a shutdown hook to set SUCCEEDED.
           status = FinalApplicationStatus.SUCCEEDED
+        } catch {
+          case e: InvocationTargetException => {
+            e.getCause match {
+              case _: InterruptedException => {
+                // Reporter thread can interrupt to stop user class
+              }
+            }
+          }
         } finally {
           logDebug("Finishing main")
         }
         finalStatus = status
       }
     }
-    t.setName("Driver")
-    t.start()
-    t
+    userClassThread.setName("Driver")
+    userClassThread.start()
+    userClassThread
   }
 
   // Actor used to monitor the driver when running in client deploy mode.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org