You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2013/12/14 09:41:47 UTC
[03/50] [abbrv] git commit: Enabled remote death watch and a way to
configure the timeouts for akka heartbeats.
Enabled remote death watch and a way to configure the timeouts for akka heartbeats.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/a8bfdd43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/a8bfdd43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/a8bfdd43
Branch: refs/heads/master
Commit: a8bfdd4377918bf665d4615c7b45ed84a7a9cebc
Parents: c77ca1f
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Tue Nov 12 11:38:20 2013 +0530
Committer: Prashant Sharma <pr...@imaginea.com>
Committed: Tue Nov 12 12:04:00 2013 +0530
----------------------------------------------------------------------
.../executor/StandaloneExecutorBackend.scala | 4 +-
.../cluster/StandaloneSchedulerBackend.scala | 5 ---
.../scala/org/apache/spark/util/AkkaUtils.scala | 44 +++++++++++---------
3 files changed, 27 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a8bfdd43/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
index f705a56..a76a8e9 100644
--- a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
@@ -59,12 +59,12 @@ private[spark] class StandaloneExecutorBackend(
driver = context.actorSelection(driverUrl)
driver ! RegisterExecutor(executorId, hostPort, cores)
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
- // context.watch(driver) // Doesn't work with remote actors, but useful for testing
}
override def receive = {
case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver")
+ context.watch(sender) //Start watching for terminated messages.
// Make this host instead of hostPort ?
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)
@@ -81,7 +81,7 @@ private[spark] class StandaloneExecutorBackend(
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}
- case DisassociatedEvent(_, _, _) =>
+ case Terminated(actor) =>
logError("Driver terminated or disconnected! Shutting down.")
System.exit(1)
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a8bfdd43/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index b6f0ec9..2d09b32 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -102,11 +102,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
case Terminated(actor) =>
actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
- case DisassociatedEvent(_, remoteAddress, _) =>
- addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client disconnected"))
-
- case AssociationErrorEvent(_, _, remoteAddress, _) =>
- addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client shutdown"))
}
// Make fake resource offers on all executors
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/a8bfdd43/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 8daf50a..2a83138 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -43,29 +43,35 @@ private[spark] object AkkaUtils {
val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
- // 10 seconds is the default akka timeout, but in a cluster, we need higher by default.
- val akkaWriteTimeout = System.getProperty("spark.akka.writeTimeout", "30").toInt
- val akkaConf = ConfigFactory.parseString("""
- akka.daemonic = on
- akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
- akka.stdout-loglevel = "ERROR"
- akka.actor.provider = "akka.remote.RemoteActorRefProvider"
- akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
- akka.remote.netty.tcp.hostname = "%s"
- akka.remote.netty.tcp.port = %d
- akka.remote.netty.tcp.connection-timeout = %d s
- akka.remote.netty.tcp.maximum-frame-size = %dMiB
- akka.remote.netty.tcp.execution-pool-size = %d
- akka.actor.default-dispatcher.throughput = %d
- akka.remote.log-remote-lifecycle-events = %s
- """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize,
- lifecycleEvents))
+ val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "30").toInt
+ val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "30").toInt
+ val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "3").toInt
+
+ val akkaConf = ConfigFactory.parseString(
+ s"""
+ |akka.daemonic = on
+ |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
+ |akka.stdout-loglevel = "ERROR"
+ |akka.remote.watch-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
+ |akka.remote.watch-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
+ |akka.remote.watch-failure-detector.threshold = $akkaFailureDetector
+ |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
+ |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
+ |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
+ |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+ |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
+ |akka.remote.netty.tcp.hostname = "$host"
+ |akka.remote.netty.tcp.port = $port
+ |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
+ |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB
+ |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
+ |akka.actor.default-dispatcher.throughput = $akkaBatchSize
+ |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
+ """.stripMargin)
val actorSystem = ActorSystem(name, akkaConf)
- // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
- // hack because Akka doesn't let you figure out the port through the public API yet.
val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
val boundPort = provider.getDefaultAddress.port.get
(actorSystem, boundPort)