You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/14 09:42:16 UTC
[32/50] [abbrv] git commit: Rename SparkActorSystem to
IndestructibleActorSystem
Rename SparkActorSystem to IndestructibleActorSystem
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/5a864e3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/5a864e3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/5a864e3f
Branch: refs/heads/master
Commit: 5a864e3fce234d19e1b371d9bab40554293546bb
Parents: f6c8c1c c9cd2af
Author: Aaron Davidson <aa...@databricks.com>
Authored: Fri Dec 6 00:16:40 2013 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Fri Dec 6 00:21:43 2013 -0800
----------------------------------------------------------------------
.../executor/CoarseGrainedExecutorBackend.scala | 3 +-
.../cluster/SimrSchedulerBackend.scala | 2 +-
.../spark/storage/BlockObjectWriter.scala | 2 +-
.../scala/org/apache/spark/util/AkkaUtils.scala | 15 ++++--
.../spark/util/IndestructibleActorSystem.scala | 55 +++++++++++++++++++
.../apache/spark/util/SparkActorSystem.scala | 56 --------------------
.../spark/deploy/yarn/WorkerLauncher.scala | 2 +-
7 files changed, 72 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a864e3f/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index dcb12be,406e015..debbdd4
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@@ -97,7 -97,8 +97,8 @@@ private[spark] object CoarseGrainedExec
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
- useSparkAS = true)
++ indestructible = true)
// set it
val sparkHostPort = hostname + ":" + boundPort
System.setProperty("spark.hostPort", sparkHostPort)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a864e3f/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index b4451fc,b4451fc..df33f6b
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@@ -44,7 -44,7 +44,7 @@@ abstract class BlockObjectWriter(val bl
* Flush the partial writes and commit them as a single atomic block. Return the
* number of bytes written for this commit.
*/
-- def commit(): Long
++ def commit(): LongSpark
/**
* Reverts writes that haven't been flushed yet. Callers should invoke this function
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a864e3f/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 407e9ff,f3e2644..9f3f163
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@@ -17,7 -17,7 +17,7 @@@
package org.apache.spark.util
--import akka.actor.{SparkActorSystem, ActorSystem, ExtendedActorSystem}
++import akka.actor.{IndestructibleActorSystem, ActorSystem, ExtendedActorSystem}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.concurrent.Await
@@@ -34,8 -34,10 +34,13 @@@ private[spark] object AkkaUtils
*
* Note: the `name` parameter is important, as even if a client sends a message to right
* host + port, if the system name is incorrect, Akka will drop the message.
++ *
++ * If indestructible is set to true, the Actor System will continue running in the event
++ * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
*/
- def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
- def createActorSystem(name: String, host: String, port: Int,
- useSparkAS: Boolean = false): (ActorSystem, Int) = {
++ def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false)
++ : (ActorSystem, Int) = {
+
val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
@@@ -70,7 -72,12 +75,11 @@@
|akka.remote.log-remote-lifecycle-events = $lifecycleEvents
""".stripMargin)
- val actorSystem = SparkActorSystem(name, akkaConf)
- val actorSystem = if (useSparkAS) {
- SparkActorSystem(name, akkaConf)
- }
- else {
++ val actorSystem = if (indestructible) {
++ IndestructibleActorSystem(name, akkaConf)
++ } else {
+ ActorSystem(name, akkaConf)
+ }
val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
val boundPort = provider.getDefaultAddress.port.get
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a864e3f/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
index 0000000,0000000..6951986
new file mode 100644
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
@@@ -1,0 -1,0 +1,55 @@@
++/**
++ * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
++ */
++
++// Must be in akka.actor package as ActorSystemImpl is protected[akka].
++package akka.actor
++
++import scala.util.control.{ControlThrowable, NonFatal}
++
++import com.typesafe.config.Config
++
++/**
++ * An [[akka.actor.ActorSystem]] which refuses to shut down in the event of a fatal exception.
++ * This is necessary as Spark Executors are allowed to recover from fatal exceptions
++ * (see [[org.apache.spark.executor.Executor]]).
++ */
++object IndestructibleActorSystem {
++ def apply(name: String, config: Config): ActorSystem =
++ apply(name, config, ActorSystem.findClassLoader())
++
++ def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem =
++ new IndestructibleActorSystemImpl(name, config, classLoader).start()
++}
++
++private[akka] class IndestructibleActorSystemImpl(
++ override val name: String,
++ applicationConfig: Config,
++ classLoader: ClassLoader)
++ extends ActorSystemImpl(name, applicationConfig, classLoader) {
++
++ protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = {
++ val fallbackHandler = super.uncaughtExceptionHandler
++
++ new Thread.UncaughtExceptionHandler() {
++ def uncaughtException(thread: Thread, cause: Throwable): Unit = {
++ if (isFatalError(cause) && !settings.JvmExitOnFatalError) {
++ log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
++ "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name)
++ //shutdown() //TODO make it configurable
++ } else {
++ fallbackHandler.uncaughtException(thread, cause)
++ }
++ }
++ }
++ }
++
++ def isFatalError(e: Throwable): Boolean = {
++ e match {
++ case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable =>
++ false
++ case _ =>
++ true
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a864e3f/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala
index a679fd6,d329063..0000000
deleted file mode 100644,100644
--- a/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala
+++ /dev/null
@@@ -1,56 -1,56 +1,0 @@@
--/**
-- * Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
-- */
--
--// Must be in akka.actor package as ActorSystemImpl is protected[akka].
--package akka.actor
--
--import scala.util.control.{ControlThrowable, NonFatal}
--
--import com.typesafe.config.Config
--
--/**
-- * An ActorSystem specific to Spark. Based off of [[akka.actor.ActorSystem]].
-- * The only change from the default system is that we do not shut down the ActorSystem
-- * in the event of a fatal exception. This is necessary as Spark is allowed to recover
-- * from fatal exceptions (see [[org.apache.spark.executor.Executor]]).
-- */
--object SparkActorSystem {
-- def apply(name: String, config: Config): ActorSystem =
-- apply(name, config, ActorSystem.findClassLoader())
--
-- def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem =
-- new SparkActorSystemImpl(name, config, classLoader).start()
--}
--
--private[akka] class SparkActorSystemImpl(
-- override val name: String,
-- applicationConfig: Config,
-- classLoader: ClassLoader)
-- extends ActorSystemImpl(name, applicationConfig, classLoader) {
--
-- protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = {
-- val fallbackHandler = super.uncaughtExceptionHandler
--
-- new Thread.UncaughtExceptionHandler() {
-- def uncaughtException(thread: Thread, cause: Throwable): Unit = {
-- if (isFatalError(cause) && !settings.JvmExitOnFatalError) {
-- log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
- "ActorSystem tolerating and continuing.... [{}]", thread.getName, name)
- //shutdown() //TODO make it configurable
- } else {
- fallbackHandler.uncaughtException(thread, cause)
- }
- }
- }
- }
-
- def isFatalError(e: Throwable): Boolean = {
- e match {
- case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable =>
- false
- case _ =>
- true
- }
- }
- }
- "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name)
- //shutdown() //TODO make it configurable
- } else {
- fallbackHandler.uncaughtException(thread, cause)
- }
- }
- }
- }
-
- def isFatalError(e: Throwable): Boolean = {
- e match {
- case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable =>
- false
- case _ =>
- true
- }
- }
-}