You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/14 09:42:16 UTC

[32/50] [abbrv] git commit: Rename SparkActorSystem to IndestructibleActorSystem

Rename SparkActorSystem to IndestructibleActorSystem


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/5a864e3f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/5a864e3f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/5a864e3f

Branch: refs/heads/master
Commit: 5a864e3fce234d19e1b371d9bab40554293546bb
Parents: f6c8c1c c9cd2af
Author: Aaron Davidson <aa...@databricks.com>
Authored: Fri Dec 6 00:16:40 2013 -0800
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Fri Dec 6 00:21:43 2013 -0800

----------------------------------------------------------------------
 .../executor/CoarseGrainedExecutorBackend.scala |  3 +-
 .../cluster/SimrSchedulerBackend.scala          |  2 +-
 .../spark/storage/BlockObjectWriter.scala       |  2 +-
 .../scala/org/apache/spark/util/AkkaUtils.scala | 15 ++++--
 .../spark/util/IndestructibleActorSystem.scala  | 55 +++++++++++++++++++
 .../apache/spark/util/SparkActorSystem.scala    | 56 --------------------
 .../spark/deploy/yarn/WorkerLauncher.scala      |  2 +-
 7 files changed, 72 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a864e3f/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index dcb12be,406e015..debbdd4
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@@ -97,7 -97,8 +97,8 @@@ private[spark] object CoarseGrainedExec
  
      // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
      // before getting started with all our system properties, etc
-     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
+     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
 -      useSparkAS = true)
++      indestructible = true)
      // set it
      val sparkHostPort = hostname + ":" + boundPort
      System.setProperty("spark.hostPort", sparkHostPort)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a864e3f/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index b4451fc,b4451fc..df33f6b
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@@ -44,7 -44,7 +44,7 @@@ abstract class BlockObjectWriter(val bl
     * Flush the partial writes and commit them as a single atomic block. Return the
     * number of bytes written for this commit.
     */
--  def commit(): Long
++  def commit(): LongSpark
  
    /**
     * Reverts writes that haven't been flushed yet. Callers should invoke this function

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a864e3f/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 407e9ff,f3e2644..9f3f163
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@@ -17,7 -17,7 +17,7 @@@
  
  package org.apache.spark.util
  
--import akka.actor.{SparkActorSystem, ActorSystem, ExtendedActorSystem}
++import akka.actor.{IndestructibleActorSystem, ActorSystem, ExtendedActorSystem}
  import com.typesafe.config.ConfigFactory
  import scala.concurrent.duration._
  import scala.concurrent.Await
@@@ -34,8 -34,10 +34,13 @@@ private[spark] object AkkaUtils 
     *
     * Note: the `name` parameter is important, as even if a client sends a message to right
     * host + port, if the system name is incorrect, Akka will drop the message.
++   *
++   * If indestructible is set to true, the Actor System will continue running in the event
++   * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
     */
-   def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
 -  def createActorSystem(name: String, host: String, port: Int,
 -    useSparkAS: Boolean = false): (ActorSystem, Int) = {
++  def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false)
++    : (ActorSystem, Int) = {
+ 
      val akkaThreads   = System.getProperty("spark.akka.threads", "4").toInt
      val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
  
@@@ -70,7 -72,12 +75,11 @@@
        |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
        """.stripMargin)
  
-     val actorSystem = SparkActorSystem(name, akkaConf)
 -    val actorSystem = if (useSparkAS) {
 -      SparkActorSystem(name, akkaConf)
 -    }
 -    else {
++    val actorSystem = if (indestructible) {
++      IndestructibleActorSystem(name, akkaConf)
++    } else {
+       ActorSystem(name, akkaConf)
+     }
  
      val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
      val boundPort = provider.getDefaultAddress.port.get

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a864e3f/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
index 0000000,0000000..6951986
new file mode 100644
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/IndestructibleActorSystem.scala
@@@ -1,0 -1,0 +1,55 @@@
++/**
++ *  Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
++ */
++
++// Must be in akka.actor package as ActorSystemImpl is protected[akka].
++package akka.actor
++
++import scala.util.control.{ControlThrowable, NonFatal}
++
++import com.typesafe.config.Config
++
++/**
++ * An [[akka.actor.ActorSystem]] which refuses to shut down in the event of a fatal exception.
++ * This is necessary as Spark Executors are allowed to recover from fatal exceptions
++ * (see [[org.apache.spark.executor.Executor]]).
++ */
++object IndestructibleActorSystem {
++  def apply(name: String, config: Config): ActorSystem =
++    apply(name, config, ActorSystem.findClassLoader())
++
++  def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem =
++    new IndestructibleActorSystemImpl(name, config, classLoader).start()
++}
++
++private[akka] class IndestructibleActorSystemImpl(
++    override val name: String,
++    applicationConfig: Config,
++    classLoader: ClassLoader)
++  extends ActorSystemImpl(name, applicationConfig, classLoader) {
++
++  protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = {
++    val fallbackHandler = super.uncaughtExceptionHandler
++
++    new Thread.UncaughtExceptionHandler() {
++      def uncaughtException(thread: Thread, cause: Throwable): Unit = {
++        if (isFatalError(cause) && !settings.JvmExitOnFatalError) {
++          log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
++            "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name)
++          //shutdown()                 //TODO make it configurable
++        } else {
++          fallbackHandler.uncaughtException(thread, cause)
++        }
++      }
++    }
++  }
++
++  def isFatalError(e: Throwable): Boolean = {
++    e match {
++      case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable =>
++        false
++      case _ =>
++        true
++    }
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5a864e3f/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala
index a679fd6,d329063..0000000
deleted file mode 100644,100644
--- a/core/src/main/scala/org/apache/spark/util/SparkActorSystem.scala
+++ /dev/null
@@@ -1,56 -1,56 +1,0 @@@
--/**
-- *  Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
-- */
--
--// Must be in akka.actor package as ActorSystemImpl is protected[akka].
--package akka.actor
--
--import scala.util.control.{ControlThrowable, NonFatal}
--
--import com.typesafe.config.Config
--
--/**
-- * An ActorSystem specific to Spark. Based off of [[akka.actor.ActorSystem]].
-- * The only change from the default system is that we do not shut down the ActorSystem
-- * in the event of a fatal exception. This is necessary as Spark is allowed to recover
-- * from fatal exceptions (see [[org.apache.spark.executor.Executor]]).
-- */
--object SparkActorSystem {
--  def apply(name: String, config: Config): ActorSystem =
--    apply(name, config, ActorSystem.findClassLoader())
--
--  def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem =
--    new SparkActorSystemImpl(name, config, classLoader).start()
--}
--
--private[akka] class SparkActorSystemImpl(
--    override val name: String,
--    applicationConfig: Config,
--    classLoader: ClassLoader)
--  extends ActorSystemImpl(name, applicationConfig, classLoader) {
--
--  protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = {
--    val fallbackHandler = super.uncaughtExceptionHandler
--
--    new Thread.UncaughtExceptionHandler() {
--      def uncaughtException(thread: Thread, cause: Throwable): Unit = {
--        if (isFatalError(cause) && !settings.JvmExitOnFatalError) {
--          log.error(cause, "Uncaught fatal error from thread [{}] not shutting down " +
-             "ActorSystem tolerating and continuing.... [{}]", thread.getName, name)
-           //shutdown()                 //TODO make it configurable
-         } else {
-           fallbackHandler.uncaughtException(thread, cause)
-         }
-       }
-     }
-   }
- 
-   def isFatalError(e: Throwable): Boolean = {
-     e match {
-       case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable =>
-         false
-       case _ =>
-         true
-     }
-   }
- }
 -            "ActorSystem [{}] tolerating and continuing.... ", thread.getName, name)
 -          //shutdown()                 //TODO make it configurable
 -        } else {
 -          fallbackHandler.uncaughtException(thread, cause)
 -        }
 -      }
 -    }
 -  }
 -
 -  def isFatalError(e: Throwable): Boolean = {
 -    e match {
 -      case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable =>
 -        false
 -      case _ =>
 -        true
 -    }
 -  }
 -}