You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/07/12 07:15:01 UTC
spark git commit: [SPARK-21146][CORE] Master/Worker should handle and
shutdown when any thread gets UncaughtException
Repository: spark
Updated Branches:
refs/heads/master 24367f23f -> e16e8c7ad
[SPARK-21146][CORE] Master/Worker should handle and shutdown when any thread gets UncaughtException
## What changes were proposed in this pull request?
Adding the default UncaughtExceptionHandler to the Worker.
## How was this patch tested?
I verified it manually, when any of the worker thread gets uncaught exceptions then the default UncaughtExceptionHandler will handle those exceptions.
Author: Devaraj K <de...@apache.org>
Closes #18357 from devaraj-kavali/SPARK-21146.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e16e8c7a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e16e8c7a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e16e8c7a
Branch: refs/heads/master
Commit: e16e8c7ad31762aaca5e2bc874de1540af9cc4b7
Parents: 24367f2
Author: Devaraj K <de...@apache.org>
Authored: Wed Jul 12 00:14:58 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Jul 12 00:14:58 2017 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/deploy/master/Master.scala | 4 +++-
.../scala/org/apache/spark/deploy/worker/Worker.scala | 4 +++-
.../main/scala/org/apache/spark/executor/Executor.scala | 2 +-
.../spark/util/SparkUncaughtExceptionHandler.scala | 11 ++++++-----
core/src/main/scala/org/apache/spark/util/Utils.scala | 4 +++-
.../spark/deploy/mesos/MesosClusterDispatcher.scala | 2 +-
6 files changed, 17 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e16e8c7a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 0dee25f..4cc580e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -36,7 +36,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.serializer.{JavaSerializer, Serializer}
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
private[deploy] class Master(
override val rpcEnv: RpcEnv,
@@ -1045,6 +1045,8 @@ private[deploy] object Master extends Logging {
val ENDPOINT_NAME = "Master"
def main(argStrings: Array[String]) {
+ Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
+ exitOnUncaughtException = false))
Utils.initDaemon(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
http://git-wip-us.apache.org/repos/asf/spark/blob/e16e8c7a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index bed4745..f6d3876 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -38,7 +38,7 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils}
private[deploy] class Worker(
override val rpcEnv: RpcEnv,
@@ -737,6 +737,8 @@ private[deploy] object Worker extends Logging {
val ENDPOINT_NAME = "Worker"
def main(argStrings: Array[String]) {
+ Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
+ exitOnUncaughtException = false))
Utils.initDaemon(log)
val conf = new SparkConf
val args = new WorkerArguments(argStrings, conf)
http://git-wip-us.apache.org/repos/asf/spark/blob/e16e8c7a/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 19e7eb0..21f0db1 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -56,7 +56,7 @@ private[spark] class Executor(
env: SparkEnv,
userClassPath: Seq[URL] = Nil,
isLocal: Boolean = false,
- uncaughtExceptionHandler: UncaughtExceptionHandler = SparkUncaughtExceptionHandler)
+ uncaughtExceptionHandler: UncaughtExceptionHandler = new SparkUncaughtExceptionHandler)
extends Logging {
logInfo(s"Starting executor ID $executorId on host $executorHostname")
http://git-wip-us.apache.org/repos/asf/spark/blob/e16e8c7a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
index 95bf3f5..e0f5af5 100644
--- a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
+++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
@@ -20,11 +20,12 @@ package org.apache.spark.util
import org.apache.spark.internal.Logging
/**
- * The default uncaught exception handler for Executors terminates the whole process, to avoid
- * getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better
- * to fail fast when things go wrong.
+ * The default uncaught exception handler for Spark daemons. It terminates the whole process for
+ * any Errors, and also terminates the process for Exceptions when the exitOnException flag is true.
+ *
+ * @param exitOnUncaughtException Whether to exit the process on UncaughtException.
*/
-private[spark] object SparkUncaughtExceptionHandler
+private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: Boolean = true)
extends Thread.UncaughtExceptionHandler with Logging {
override def uncaughtException(thread: Thread, exception: Throwable) {
@@ -40,7 +41,7 @@ private[spark] object SparkUncaughtExceptionHandler
if (!ShutdownHookManager.inShutdown()) {
if (exception.isInstanceOf[OutOfMemoryError]) {
System.exit(SparkExitCode.OOM)
- } else {
+ } else if (exitOnUncaughtException) {
System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e16e8c7a/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index b4caf68..584337a 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -76,6 +76,8 @@ private[spark] object CallSite {
private[spark] object Utils extends Logging {
val random = new Random()
+ private val sparkUncaughtExceptionHandler = new SparkUncaughtExceptionHandler
+
/**
* Define a default value for driver memory here since this value is referenced across the code
* base and nearly all files already use Utils.scala
@@ -1274,7 +1276,7 @@ private[spark] object Utils extends Logging {
block
} catch {
case e: ControlThrowable => throw e
- case t: Throwable => SparkUncaughtExceptionHandler.uncaughtException(t)
+ case t: Throwable => sparkUncaughtExceptionHandler.uncaughtException(t)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/e16e8c7a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
----------------------------------------------------------------------
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
index 38b082a..aa378c9 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
@@ -97,7 +97,7 @@ private[mesos] object MesosClusterDispatcher
with CommandLineUtils {
override def main(args: Array[String]) {
- Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler)
+ Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler)
Utils.initDaemon(log)
val conf = new SparkConf
val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org