You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/14 09:41:47 UTC

[03/50] [abbrv] git commit: Enabled remote death watch and a way to configure the timeouts for akka heartbeats.

Enabled remote death watch and a way to configure the timeouts for akka heartbeats.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a8bfdd43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a8bfdd43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a8bfdd43

Branch: refs/heads/master
Commit: a8bfdd4377918bf665d4615c7b45ed84a7a9cebc
Parents: c77ca1f
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Tue Nov 12 11:38:20 2013 +0530
Committer: Prashant Sharma <pr...@imaginea.com>
Committed: Tue Nov 12 12:04:00 2013 +0530

----------------------------------------------------------------------
 .../executor/StandaloneExecutorBackend.scala    |  4 +-
 .../cluster/StandaloneSchedulerBackend.scala    |  5 ---
 .../scala/org/apache/spark/util/AkkaUtils.scala | 44 +++++++++++---------
 3 files changed, 27 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a8bfdd43/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
index f705a56..a76a8e9 100644
--- a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
@@ -59,12 +59,12 @@ private[spark] class StandaloneExecutorBackend(
     driver = context.actorSelection(driverUrl)
     driver ! RegisterExecutor(executorId, hostPort, cores)
     context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
-   // context.watch(driver) // Doesn't work with remote actors, but useful for testing
   }
 
   override def receive = {
     case RegisteredExecutor(sparkProperties) =>
       logInfo("Successfully registered with driver")
+      context.watch(sender) //Start watching for terminated messages.
       // Make this host instead of hostPort ?
       executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)
 
@@ -81,7 +81,7 @@ private[spark] class StandaloneExecutorBackend(
         executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
       }
 
-    case DisassociatedEvent(_, _, _) =>
+    case Terminated(actor) =>
       logError("Driver terminated or disconnected! Shutting down.")
       System.exit(1)
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a8bfdd43/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index b6f0ec9..2d09b32 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -102,11 +102,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
       case Terminated(actor) =>
         actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
 
-      case DisassociatedEvent(_, remoteAddress, _) =>
-        addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client disconnected"))
-
-      case AssociationErrorEvent(_, _, remoteAddress, _) =>
-        addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client shutdown"))
     }
 
     // Make fake resource offers on all executors

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a8bfdd43/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 8daf50a..2a83138 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -43,29 +43,35 @@ private[spark] object AkkaUtils {
 
     val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
     val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
-    // 10 seconds is the default akka timeout, but in a cluster, we need higher by default.
-    val akkaWriteTimeout = System.getProperty("spark.akka.writeTimeout", "30").toInt
 
-    val akkaConf = ConfigFactory.parseString("""
-      akka.daemonic = on
-      akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
-      akka.stdout-loglevel = "ERROR"
-      akka.actor.provider = "akka.remote.RemoteActorRefProvider"
-      akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
-      akka.remote.netty.tcp.hostname = "%s"
-      akka.remote.netty.tcp.port = %d
-      akka.remote.netty.tcp.connection-timeout = %d s
-      akka.remote.netty.tcp.maximum-frame-size = %dMiB
-      akka.remote.netty.tcp.execution-pool-size = %d
-      akka.actor.default-dispatcher.throughput = %d
-      akka.remote.log-remote-lifecycle-events = %s
-                                             """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize,
-        lifecycleEvents))
+    val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "30").toInt
+    val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "30").toInt
+    val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "3").toInt
+
+    val akkaConf = ConfigFactory.parseString(
+      s"""
+      |akka.daemonic = on
+      |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
+      |akka.stdout-loglevel = "ERROR"
+      |akka.remote.watch-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
+      |akka.remote.watch-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
+      |akka.remote.watch-failure-detector.threshold = $akkaFailureDetector
+      |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
+      |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
+      |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
+      |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+      |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
+      |akka.remote.netty.tcp.hostname = "$host"
+      |akka.remote.netty.tcp.port = $port
+      |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
+      |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB
+      |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
+      |akka.actor.default-dispatcher.throughput = $akkaBatchSize
+      |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
+      """.stripMargin)
 
     val actorSystem = ActorSystem(name, akkaConf)
 
-    // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
-    // hack because Akka doesn't let you figure out the port through the public API yet.
     val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
     val boundPort = provider.getDefaultAddress.port.get
     (actorSystem, boundPort)