You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/10/24 22:47:12 UTC

git commit: [SPARK-4067] refactor ExecutorUncaughtExceptionHandler

Repository: spark
Updated Branches:
  refs/heads/master b563987e8 -> f80dcf2ae


[SPARK-4067] refactor ExecutorUncaughtExceptionHandler

https://issues.apache.org/jira/browse/SPARK-4067

currently , we call Utils.tryOrExit everywhere
AppClient
Executor
TaskSchedulerImpl
It makes the name of ExecutorUncaughtExceptionHandler unfit to the real case....

Author: Nan Zhu <na...@Nans-MacBook-Pro.local>
Author: Nan Zhu <na...@nans-mbp.home>

Closes #2913 from CodingCat/SPARK-4067 and squashes the following commits:

035ee3d [Nan Zhu] make RAT happy
e62e416 [Nan Zhu] add some general Exit code
a10b63f [Nan Zhu] refactor


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f80dcf2a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f80dcf2a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f80dcf2a

Branch: refs/heads/master
Commit: f80dcf2aeef762ca370e91d2c7d6e4f7894c3cd8
Parents: b563987
Author: Nan Zhu <na...@Nans-MacBook-Pro.local>
Authored: Fri Oct 24 13:46:45 2014 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Fri Oct 24 13:47:06 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/executor/Executor.scala    |  6 +--
 .../spark/executor/ExecutorExitCode.scala       | 12 +----
 .../ExecutorUncaughtExceptionHandler.scala      | 53 --------------------
 .../org/apache/spark/util/SparkExitCode.scala   | 32 ++++++++++++
 .../util/SparkUncaughtExceptionHandler.scala    | 52 +++++++++++++++++++
 .../scala/org/apache/spark/util/Utils.scala     |  4 +-
 6 files changed, 91 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f80dcf2a/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 70a46c7..2889f59 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -33,7 +33,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.scheduler._
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{SparkUncaughtExceptionHandler, AkkaUtils, Utils}
 
 /**
  * Spark executor used with Mesos, YARN, and the standalone scheduler.
@@ -72,7 +72,7 @@ private[spark] class Executor(
     // Setup an uncaught exception handler for non-local mode.
     // Make any thread terminations due to uncaught exceptions kill the entire
     // executor process to avoid surprising stalls.
-    Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler)
+    Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler)
   }
 
   val executorSource = new ExecutorSource(this, executorId)
@@ -258,7 +258,7 @@ private[spark] class Executor(
           // Don't forcibly exit unless the exception was inherently fatal, to avoid
           // stopping other tasks unnecessarily.
           if (Utils.isFatalError(t)) {
-            ExecutorUncaughtExceptionHandler.uncaughtException(t)
+            SparkUncaughtExceptionHandler.uncaughtException(t)
           }
         }
       } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/f80dcf2a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
index 38be2c5..52862ae 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.executor
 
+import org.apache.spark.util.SparkExitCode._
+
 /**
  * These are exit codes that executors should use to provide the master with information about
  * executor failures assuming that cluster management framework can capture the exit codes (but
@@ -27,16 +29,6 @@ package org.apache.spark.executor
  */
 private[spark]
 object ExecutorExitCode {
-  /** The default uncaught exception handler was reached. */
-  val UNCAUGHT_EXCEPTION = 50
-
-  /** The default uncaught exception handler was called and an exception was encountered while
-      logging the exception. */
-  val UNCAUGHT_EXCEPTION_TWICE = 51
-
-  /** The default uncaught exception handler was reached, and the uncaught exception was an
-      OutOfMemoryError. */
-  val OOM = 52
 
   /** DiskStore failed to create a local temporary directory after many attempts. */
   val DISK_STORE_FAILED_TO_CREATE_DIR = 53

http://git-wip-us.apache.org/repos/asf/spark/blob/f80dcf2a/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala
deleted file mode 100644
index b0e984c..0000000
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.executor
-
-import org.apache.spark.Logging
-import org.apache.spark.util.Utils
-
-/**
- * The default uncaught exception handler for Executors terminates the whole process, to avoid
- * getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better
- * to fail fast when things go wrong.
- */
-private[spark] object ExecutorUncaughtExceptionHandler
-  extends Thread.UncaughtExceptionHandler with Logging {
-
-  override def uncaughtException(thread: Thread, exception: Throwable) {
-    try {
-      logError("Uncaught exception in thread " + thread, exception)
-
-      // We may have been called from a shutdown hook. If so, we must not call System.exit().
-      // (If we do, we will deadlock.)
-      if (!Utils.inShutdown()) {
-        if (exception.isInstanceOf[OutOfMemoryError]) {
-          System.exit(ExecutorExitCode.OOM)
-        } else {
-          System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
-        }
-      }
-    } catch {
-      case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
-      case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
-    }
-  }
-
-  def uncaughtException(exception: Throwable) {
-    uncaughtException(Thread.currentThread(), exception)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f80dcf2a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala
new file mode 100644
index 0000000..c93b1cc
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+private[spark] object SparkExitCode {
+  /** The default uncaught exception handler was reached. */
+  val UNCAUGHT_EXCEPTION = 50
+
+  /** The default uncaught exception handler was called and an exception was encountered while
+      logging the exception. */
+  val UNCAUGHT_EXCEPTION_TWICE = 51
+
+  /** The default uncaught exception handler was reached, and the uncaught exception was an
+      OutOfMemoryError. */
+  val OOM = 52
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f80dcf2a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
new file mode 100644
index 0000000..ad3db1f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.spark.Logging
+
+/**
+ * The default uncaught exception handler for Executors terminates the whole process, to avoid
+ * getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better
+ * to fail fast when things go wrong.
+ */
+private[spark] object SparkUncaughtExceptionHandler
+  extends Thread.UncaughtExceptionHandler with Logging {
+
+  override def uncaughtException(thread: Thread, exception: Throwable) {
+    try {
+      logError("Uncaught exception in thread " + thread, exception)
+
+      // We may have been called from a shutdown hook. If so, we must not call System.exit().
+      // (If we do, we will deadlock.)
+      if (!Utils.inShutdown()) {
+        if (exception.isInstanceOf[OutOfMemoryError]) {
+          System.exit(SparkExitCode.OOM)
+        } else {
+          System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
+        }
+      }
+    } catch {
+      case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM)
+      case t: Throwable => Runtime.getRuntime.halt(SparkExitCode.UNCAUGHT_EXCEPTION_TWICE)
+    }
+  }
+
+  def uncaughtException(exception: Throwable) {
+    uncaughtException(Thread.currentThread(), exception)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f80dcf2a/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index ccbddd9..65bdbaa 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -43,7 +43,7 @@ import org.json4s._
 import tachyon.client.{TachyonFile,TachyonFS}
 
 import org.apache.spark._
-import org.apache.spark.executor.ExecutorUncaughtExceptionHandler
+import org.apache.spark.util.SparkUncaughtExceptionHandler
 import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
 
 /** CallSite represents a place in user code. It can have a short and a long form. */
@@ -965,7 +965,7 @@ private[spark] object Utils extends Logging {
       block
     } catch {
       case e: ControlThrowable => throw e
-      case t: Throwable => ExecutorUncaughtExceptionHandler.uncaughtException(t)
+      case t: Throwable => SparkUncaughtExceptionHandler.uncaughtException(t)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org